You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by bg...@apache.org on 2015/12/07 02:22:16 UTC
reef git commit: [REEF-1004] Define Avro schema for Vortex messages
Repository: reef
Updated Branches:
refs/heads/master 7d5a14169 -> 71951320e
[REEF-1004] Define Avro schema for Vortex messages
This addressed the issue by
* Defining Avro schema for Vortex messages.
* Adding VortexAvroUtils to serialize/deserialize messages.
* Replacing SerializationUtils with Avro.
* Adding TODO comments for user function, input, and output.
JIRA:
[REEF-1004](https://issues.apache.org/jira/browse/REEF-1004)
Pull Request:
Closes #677
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/71951320
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/71951320
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/71951320
Branch: refs/heads/master
Commit: 71951320e12c1341f0b8c9b774636f7054455312
Parents: 7d5a141
Author: Yunseong Lee <yu...@apache.org>
Authored: Mon Nov 16 12:13:08 2015 +0800
Committer: Byung-Gon Chun <bg...@apache.org>
Committed: Mon Dec 7 10:19:40 2015 +0900
----------------------------------------------------------------------
lang/java/reef-applications/reef-vortex/pom.xml | 42 ++++
.../src/main/avro/VortexRequest.avsc | 39 ++++
.../reef-vortex/src/main/avro/WorkerReport.avsc | 57 +++++
.../vortex/common/TaskletExecutionRequest.java | 14 ++
.../reef/vortex/common/VortexAvroUtils.java | 215 +++++++++++++++++++
.../apache/reef/vortex/driver/VortexDriver.java | 11 +-
.../reef/vortex/driver/VortexRequestor.java | 4 +-
.../reef/vortex/evaluator/VortexWorker.java | 48 ++---
8 files changed, 398 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/pom.xml b/lang/java/reef-applications/reef-vortex/pom.xml
index c82a856..5c11242 100644
--- a/lang/java/reef-applications/reef-vortex/pom.xml
+++ b/lang/java/reef-applications/reef-vortex/pom.xml
@@ -50,11 +50,53 @@ under the License.
<artifactId>reef-runtime-local</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+ <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/avro</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
new file mode 100644
index 0000000..2453761
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+ {
+ "namespace": "org.apache.reef.vortex.common.avro",
+ "type": "record",
+ "name": "AvroTaskletExecutionRequest",
+ "fields": [
+ {"name": "taskletId", "type": "int"},
+ {"name": "serializedUserFunction", "type": "bytes"},
+ {"name": "serializedInput", "type": "bytes"}
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.vortex.common.avro",
+ "type": "record",
+ "name": "AvroVortexRequest",
+ "fields": [
+ {"name": "requestType", "type": {"type": "enum", "name": "AvroRequestType", "symbols": ["ExecuteTasklet"]}},
+ {"name": "taskletExecutionRequest", "type": ["null", "AvroTaskletExecutionRequest"], "default": null}
+ ]
+ }
+]
http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
new file mode 100644
index 0000000..19a655d
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+ {
+ "namespace": "org.apache.reef.vortex.common.avro",
+ "type": "record",
+ "name": "AvroTaskletResultReport",
+ "fields": [
+ {"name": "taskletId", "type": "int"},
+ {"name": "serializedOutput", "type": "bytes"}
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.vortex.common.avro",
+ "type": "record",
+ "name": "AvroTaskletFailureReport",
+ "fields": [
+ {"name": "taskletId", "type": "int"},
+ {"name": "serializedException", "type": "bytes"}
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.vortex.common.avro",
+ "type": "record",
+ "name": "AvroWorkerReport",
+ "fields": [
+ {
+ "name": "reportType",
+ "type": {"type": "enum", "name": "AvroReportType", "symbols": ["TaskletResult", "TaskletFailure"]}
+ },
+ {
+ "name": "taskletResult",
+ "type": ["null", "AvroTaskletResultReport"], "default": null
+ },
+ {
+ "name": "taskletFailure",
+ "type": ["null", "AvroTaskletFailureReport"], "default": null
+ }
+ ]
+ }
+]
http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
index c540572..961b574 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
@@ -65,4 +65,18 @@ public final class TaskletExecutionRequest<TInput extends Serializable, TOutput
public int getTaskletId() {
return taskletId;
}
+
+ /**
+ * Get function of the tasklet.
+ */
+ public VortexFunction getFunction() {
+ return userFunction;
+ }
+
+ /**
+ * Get input of the tasklet.
+ */
+ public TInput getInput() {
+ return input;
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
new file mode 100644
index 0000000..c3d01de
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.reef.vortex.api.VortexFunction;
+import org.apache.reef.vortex.common.avro.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Serialize and deserialize Vortex message to/from byte array.
+ */
+public final class VortexAvroUtils {
+ /**
+ * Serialize VortexRequest to byte array.
+ * @param vortexRequest Vortex request message to serialize.
+ * @return Serialized byte array.
+ */
+ public static byte[] toBytes(final VortexRequest vortexRequest) {
+ // Convert VortexRequest message to Avro message.
+ final AvroVortexRequest avroVortexRequest;
+ switch (vortexRequest.getType()) {
+ case ExecuteTasklet:
+ final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
+ // The following TODOs are sub-issues of cleaning up Serializable in Vortex (REEF-504).
+ // The purpose is to reduce serialization cost, which leads to bottleneck in Master.
+ // Temporarily those are left as TODOs, but will be addressed in separate PRs.
+ // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+ final byte[] serializedInput = SerializationUtils.serialize(taskletExecutionRequest.getInput());
+ // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction
+ final byte[] serializedFunction = SerializationUtils.serialize(taskletExecutionRequest.getFunction());
+ avroVortexRequest = AvroVortexRequest.newBuilder()
+ .setRequestType(AvroRequestType.ExecuteTasklet)
+ .setTaskletExecutionRequest(
+ AvroTaskletExecutionRequest.newBuilder()
+ .setTaskletId(taskletExecutionRequest.getTaskletId())
+ .setSerializedInput(ByteBuffer.wrap(serializedInput))
+ .setSerializedUserFunction(ByteBuffer.wrap(serializedFunction))
+ .build())
+ .build();
+ break;
+ default:
+ throw new RuntimeException("Undefined message type");
+ }
+
+ // Serialize the Avro message to byte array.
+ return toBytes(avroVortexRequest, AvroVortexRequest.class);
+ }
+
+ /**
+ * Serialize WorkerReport to byte array.
+ * @param workerReport Worker report message to serialize.
+ * @return Serialized byte array.
+ */
+ public static byte[] toBytes(final WorkerReport workerReport) {
+ // Convert WorkerReport message to Avro message.
+ final AvroWorkerReport avroWorkerReport;
+ switch (workerReport.getType()) {
+ case TaskletResult:
+ final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport;
+ // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+ final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult());
+ avroWorkerReport = AvroWorkerReport.newBuilder()
+ .setReportType(AvroReportType.TaskletResult)
+ .setTaskletResult(
+ AvroTaskletResultReport.newBuilder()
+ .setTaskletId(taskletResultReport.getTaskletId())
+ .setSerializedOutput(ByteBuffer.wrap(serializedOutput))
+ .build())
+ .build();
+ break;
+ case TaskletFailure:
+ final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
+ final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException());
+ avroWorkerReport = AvroWorkerReport.newBuilder()
+ .setReportType(AvroReportType.TaskletFailure)
+ .setTaskletFailure(
+ AvroTaskletFailureReport.newBuilder()
+ .setTaskletId(taskletFailureReport.getTaskletId())
+ .setSerializedException(ByteBuffer.wrap(serializedException))
+ .build())
+ .build();
+ break;
+ default:
+ throw new RuntimeException("Undefined message type");
+ }
+
+ // Serialize the Avro message to byte array.
+ return toBytes(avroWorkerReport, AvroWorkerReport.class);
+ }
+
+ /**
+ * Deserialize byte array to VortexRequest.
+ * @param bytes Byte array to deserialize.
+ * @return De-serialized VortexRequest.
+ */
+ public static VortexRequest toVortexRequest(final byte[] bytes) {
+ final AvroVortexRequest avroVortexRequest = toAvroObject(bytes, AvroVortexRequest.class);
+
+ final VortexRequest vortexRequest;
+ switch (avroVortexRequest.getRequestType()) {
+ case ExecuteTasklet:
+ final AvroTaskletExecutionRequest taskletExecutionRequest = avroVortexRequest.getTaskletExecutionRequest();
+ // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction
+ final VortexFunction function =
+ (VortexFunction) SerializationUtils.deserialize(
+ taskletExecutionRequest.getSerializedUserFunction().array());
+ // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+ final Serializable input =
+ (Serializable) SerializationUtils.deserialize(
+ taskletExecutionRequest.getSerializedInput().array());
+ vortexRequest = new TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function, input);
+ break;
+ default:
+ throw new RuntimeException("Undefined VortexRequest type");
+ }
+ return vortexRequest;
+ }
+
+ /**
+ * Deserialize byte array to WorkerReport.
+ * @param bytes Byte array to deserialize.
+ * @return De-serialized WorkerReport.
+ */
+ public static WorkerReport toWorkerReport(final byte[] bytes) {
+ final WorkerReport workerReport;
+ final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class);
+ switch (avroWorkerReport.getReportType()) {
+ case TaskletResult:
+ final AvroTaskletResultReport taskletResultReport = avroWorkerReport.getTaskletResult();
+ // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+ final Serializable output =
+ (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
+ workerReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
+ break;
+ case TaskletFailure:
+ final AvroTaskletFailureReport taskletFailureReport = avroWorkerReport.getTaskletFailure();
+ final Exception exception =
+ (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
+ workerReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
+ break;
+ default:
+ throw new RuntimeException("Undefined WorkerReport type");
+ }
+ return workerReport;
+ }
+
+ /**
+ * Serialize Avro object to byte array.
+ * @param avroObject Avro object to serialize.
+ * @param theClass Class of the Avro object.
+ * @param <T> Type of the Avro object.
+ * @return Serialized byte array.
+ */
+ private static <T> byte[] toBytes(final T avroObject, final Class<T> theClass) {
+ final DatumWriter<T> reportWriter = new SpecificDatumWriter<>(theClass);
+ final byte[] theBytes;
+ try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ reportWriter.write(avroObject, encoder);
+ encoder.flush();
+ out.flush();
+ theBytes = out.toByteArray();
+ return theBytes;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Deserialize byte array to Avro object.
+ * @param bytes Byte array to deserialize.
+ * @param theClass Class of the Avro object.
+ * @param <T> Type of the Avro object.
+ * @return Avro object de-serialized from byte array.
+ */
+ private static <T> T toAvroObject(final byte[] bytes, final Class<T> theClass) {
+ final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
+ final SpecificDatumReader<T> reader = new SpecificDatumReader<>(theClass);
+ try {
+ return reader.read(null, decoder);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Empty private constructor to prohibit instantiation of utility class.
+ */
+ private VortexAvroUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index 977302f..352cdbd 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -18,7 +18,6 @@
*/
package org.apache.reef.vortex.driver;
-import org.apache.commons.lang.SerializationUtils;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.evaluator.*;
import org.apache.reef.driver.task.RunningTask;
@@ -31,6 +30,7 @@ import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.vortex.api.VortexStart;
import org.apache.reef.vortex.common.TaskletFailureReport;
import org.apache.reef.vortex.common.TaskletResultReport;
+import org.apache.reef.vortex.common.VortexAvroUtils;
import org.apache.reef.vortex.common.WorkerReport;
import org.apache.reef.vortex.evaluator.VortexWorker;
import org.apache.reef.wake.EStage;
@@ -154,15 +154,16 @@ final class VortexDriver {
@Override
public void onNext(final TaskMessage taskMessage) {
final String workerId = taskMessage.getId();
- final WorkerReport workerReport= (WorkerReport)SerializationUtils.deserialize(taskMessage.get());
+ final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get());
switch (workerReport.getType()) {
case TaskletResult:
- final TaskletResultReport taskletResultReport = (TaskletResultReport)workerReport;
+ final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport;
vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult());
break;
case TaskletFailure:
- final TaskletFailureReport taskletFailureReport = (TaskletFailureReport)workerReport;
- vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(), taskletFailureReport.getException());
+ final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
+ vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(),
+ taskletFailureReport.getException());
break;
default:
throw new RuntimeException("Unknown Report");
http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
index 2c02d6e..b94b5b0 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
@@ -18,9 +18,9 @@
*/
package org.apache.reef.vortex.driver;
-import org.apache.commons.lang.SerializationUtils;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.vortex.common.VortexAvroUtils;
import org.apache.reef.vortex.common.VortexRequest;
import javax.inject.Inject;
@@ -43,7 +43,7 @@ class VortexRequestor {
@Override
public void run() {
// Possible race condition with VortexWorkerManager#terminate is addressed by the global lock in VortexMaster
- reefTask.send(SerializationUtils.serialize(vortexRequest));
+ reefTask.send(VortexAvroUtils.toBytes(vortexRequest));
}
});
}
http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 6a2821e..e65830c 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -18,7 +18,6 @@
*/
package org.apache.reef.vortex.evaluator;
-import org.apache.commons.lang.SerializationUtils;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.TaskSide;
import org.apache.reef.tang.annotations.Parameter;
@@ -57,7 +56,7 @@ public final class VortexWorker implements Task, TaskMessageSource {
@Inject
private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager,
- @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) {
+ @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) {
this.heartBeatTriggerManager = heartBeatTriggerManager;
this.numOfThreads = numOfThreads;
}
@@ -88,33 +87,32 @@ public final class VortexWorker implements Task, TaskMessageSource {
@Override
public void run() {
// Command Executor: Deserialize the command
- final VortexRequest vortexRequest = (VortexRequest) SerializationUtils.deserialize(message);
+ final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message);
switch (vortexRequest.getType()) {
- case ExecuteTasklet:
- final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
- try {
- // Command Executor: Execute the command
- final Serializable result = taskletExecutionRequest.execute();
-
- // Command Executor: Tasklet successfully returns result
- final WorkerReport report =
- new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
- workerReports.addLast(SerializationUtils.serialize(report));
- } catch (Exception e) {
- // Command Executor: Tasklet throws an exception
- final WorkerReport report =
- new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
- workerReports.addLast(SerializationUtils.serialize(report));
- }
-
- heartBeatTriggerManager.triggerHeartBeat();
- break;
- default:
- throw new RuntimeException("Unknown Command");
+ case ExecuteTasklet:
+ final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
+ try {
+ // Command Executor: Execute the command
+ final Serializable result = taskletExecutionRequest.execute();
+
+ // Command Executor: Tasklet successfully returns result
+ final WorkerReport report =
+ new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
+ workerReports.addLast(VortexAvroUtils.toBytes(report));
+ } catch (Exception e) {
+ // Command Executor: Tasklet throws an exception
+ final WorkerReport report =
+ new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
+ workerReports.addLast(VortexAvroUtils.toBytes(report));
+ }
+
+ heartBeatTriggerManager.triggerHeartBeat();
+ break;
+ default:
+ throw new RuntimeException("Unknown Command");
}
}
});
-
}
}
});