You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by bg...@apache.org on 2015/12/07 02:22:16 UTC

reef git commit: [REEF-1004] Define Avro schema for Vortex messages

Repository: reef
Updated Branches:
  refs/heads/master 7d5a14169 -> 71951320e


[REEF-1004] Define Avro schema for Vortex messages

This addressed the issue by
  * Defining Avro schema for Vortex messages.
  * Adding VortexAvroUtils to serialize/deserialize messages.
  * Replacing SerializationUtils with Avro.
  * Adding TODO comments for user function, input, and output.

JIRA:
  [REEF-1004](https://issues.apache.org/jira/browse/REEF-1004)

Pull Request:
  Closes #677


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/71951320
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/71951320
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/71951320

Branch: refs/heads/master
Commit: 71951320e12c1341f0b8c9b774636f7054455312
Parents: 7d5a141
Author: Yunseong Lee <yu...@apache.org>
Authored: Mon Nov 16 12:13:08 2015 +0800
Committer: Byung-Gon Chun <bg...@apache.org>
Committed: Mon Dec 7 10:19:40 2015 +0900

----------------------------------------------------------------------
 lang/java/reef-applications/reef-vortex/pom.xml |  42 ++++
 .../src/main/avro/VortexRequest.avsc            |  39 ++++
 .../reef-vortex/src/main/avro/WorkerReport.avsc |  57 +++++
 .../vortex/common/TaskletExecutionRequest.java  |  14 ++
 .../reef/vortex/common/VortexAvroUtils.java     | 215 +++++++++++++++++++
 .../apache/reef/vortex/driver/VortexDriver.java |  11 +-
 .../reef/vortex/driver/VortexRequestor.java     |   4 +-
 .../reef/vortex/evaluator/VortexWorker.java     |  48 ++---
 8 files changed, 398 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/pom.xml b/lang/java/reef-applications/reef-vortex/pom.xml
index c82a856..5c11242 100644
--- a/lang/java/reef-applications/reef-vortex/pom.xml
+++ b/lang/java/reef-applications/reef-vortex/pom.xml
@@ -50,11 +50,53 @@ under the License.
             <artifactId>reef-runtime-local</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.avro</groupId>
+                <artifactId>avro-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>schema</goal>
+                        </goals>
+                        <configuration>
+                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+                            <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/avro</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.avro</groupId>
+                <artifactId>avro-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <executions>

http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
new file mode 100644
index 0000000..2453761
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
+    "name": "AvroTaskletExecutionRequest",
+    "fields": [
+      {"name": "taskletId", "type": "int"},
+      {"name": "serializedUserFunction", "type": "bytes"},
+      {"name": "serializedInput", "type": "bytes"}
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
+    "name": "AvroVortexRequest",
+    "fields": [
+      {"name": "requestType", "type": {"type": "enum", "name": "AvroRequestType", "symbols": ["ExecuteTasklet"]}},
+      {"name": "taskletExecutionRequest", "type": ["null", "AvroTaskletExecutionRequest"], "default": null}
+    ]
+  }
+]

http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
new file mode 100644
index 0000000..19a655d
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
+    "name": "AvroTaskletResultReport",
+    "fields": [
+      {"name": "taskletId", "type": "int"},
+      {"name": "serializedOutput", "type": "bytes"}
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
+    "name": "AvroTaskletFailureReport",
+    "fields": [
+      {"name": "taskletId", "type": "int"},
+      {"name": "serializedException", "type": "bytes"}
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
+    "name": "AvroWorkerReport",
+    "fields": [
+      {
+        "name": "reportType",
+        "type": {"type": "enum", "name": "AvroReportType", "symbols": ["TaskletResult", "TaskletFailure"]}
+      },
+      {
+        "name": "taskletResult",
+        "type": ["null", "AvroTaskletResultReport"], "default": null
+      },
+      {
+        "name": "taskletFailure",
+        "type": ["null", "AvroTaskletFailureReport"], "default": null
+      }
+    ]
+  }
+]

http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
index c540572..961b574 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
@@ -65,4 +65,18 @@ public final class TaskletExecutionRequest<TInput extends Serializable, TOutput
   public int getTaskletId() {
     return taskletId;
   }
+
+  /**
+   * Get function of the tasklet.
+   */
+  public VortexFunction getFunction() {
+    return userFunction;
+  }
+
+  /**
+   * Get input of the tasklet.
+   */
+  public TInput getInput() {
+    return input;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
new file mode 100644
index 0000000..c3d01de
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.reef.vortex.api.VortexFunction;
+import org.apache.reef.vortex.common.avro.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Serialize and deserialize Vortex message to/from byte array.
+ */
+public final class VortexAvroUtils {
+  /**
+   * Serialize VortexRequest to byte array.
+   * @param vortexRequest Vortex request message to serialize.
+   * @return Serialized byte array.
+   */
+  public static byte[] toBytes(final VortexRequest vortexRequest) {
+    // Convert VortexRequest message to Avro message.
+    final AvroVortexRequest avroVortexRequest;
+    switch (vortexRequest.getType()) {
+    case ExecuteTasklet:
+      final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
+      // The following TODOs are sub-issues of cleaning up Serializable in Vortex (REEF-504).
+      // The purpose is to reduce serialization cost, which leads to bottleneck in Master.
+      // Temporarily those are left as TODOs, but will be addressed in separate PRs.
+      // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+      final byte[] serializedInput = SerializationUtils.serialize(taskletExecutionRequest.getInput());
+      // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction
+      final byte[] serializedFunction = SerializationUtils.serialize(taskletExecutionRequest.getFunction());
+      avroVortexRequest = AvroVortexRequest.newBuilder()
+          .setRequestType(AvroRequestType.ExecuteTasklet)
+          .setTaskletExecutionRequest(
+              AvroTaskletExecutionRequest.newBuilder()
+                  .setTaskletId(taskletExecutionRequest.getTaskletId())
+                  .setSerializedInput(ByteBuffer.wrap(serializedInput))
+                  .setSerializedUserFunction(ByteBuffer.wrap(serializedFunction))
+                  .build())
+          .build();
+      break;
+    default:
+      throw new RuntimeException("Undefined message type");
+    }
+
+    // Serialize the Avro message to byte array.
+    return toBytes(avroVortexRequest, AvroVortexRequest.class);
+  }
+
+  /**
+   * Serialize WorkerReport to byte array.
+   * @param workerReport Worker report message to serialize.
+   * @return Serialized byte array.
+   */
+  public static byte[] toBytes(final WorkerReport workerReport) {
+    // Convert WorkerReport message to Avro message.
+    final AvroWorkerReport avroWorkerReport;
+    switch (workerReport.getType()) {
+    case TaskletResult:
+      final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport;
+      // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+      final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult());
+      avroWorkerReport = AvroWorkerReport.newBuilder()
+          .setReportType(AvroReportType.TaskletResult)
+          .setTaskletResult(
+              AvroTaskletResultReport.newBuilder()
+                  .setTaskletId(taskletResultReport.getTaskletId())
+                  .setSerializedOutput(ByteBuffer.wrap(serializedOutput))
+                  .build())
+          .build();
+      break;
+    case TaskletFailure:
+      final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
+      final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException());
+      avroWorkerReport = AvroWorkerReport.newBuilder()
+          .setReportType(AvroReportType.TaskletFailure)
+          .setTaskletFailure(
+              AvroTaskletFailureReport.newBuilder()
+                  .setTaskletId(taskletFailureReport.getTaskletId())
+                  .setSerializedException(ByteBuffer.wrap(serializedException))
+                  .build())
+          .build();
+      break;
+    default:
+      throw new RuntimeException("Undefined message type");
+    }
+
+    // Serialize the Avro message to byte array.
+    return toBytes(avroWorkerReport, AvroWorkerReport.class);
+  }
+
+  /**
+   * Deserialize byte array to VortexRequest.
+   * @param bytes Byte array to deserialize.
+   * @return De-serialized VortexRequest.
+   */
+  public static VortexRequest toVortexRequest(final byte[] bytes) {
+    final AvroVortexRequest avroVortexRequest = toAvroObject(bytes, AvroVortexRequest.class);
+
+    final VortexRequest vortexRequest;
+    switch (avroVortexRequest.getRequestType()) {
+    case ExecuteTasklet:
+      final AvroTaskletExecutionRequest taskletExecutionRequest = avroVortexRequest.getTaskletExecutionRequest();
+      // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction
+      final VortexFunction function =
+          (VortexFunction) SerializationUtils.deserialize(
+              taskletExecutionRequest.getSerializedUserFunction().array());
+      // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+      final Serializable input =
+          (Serializable) SerializationUtils.deserialize(
+              taskletExecutionRequest.getSerializedInput().array());
+      vortexRequest = new TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function, input);
+      break;
+    default:
+      throw new RuntimeException("Undefined VortexRequest type");
+    }
+    return vortexRequest;
+  }
+
+  /**
+   * Deserialize byte array to WorkerReport.
+   * @param bytes Byte array to deserialize.
+   * @return De-serialized WorkerReport.
+   */
+  public static WorkerReport toWorkerReport(final byte[] bytes) {
+    final WorkerReport workerReport;
+    final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class);
+    switch (avroWorkerReport.getReportType()) {
+    case TaskletResult:
+      final AvroTaskletResultReport taskletResultReport = avroWorkerReport.getTaskletResult();
+      // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+      final Serializable output =
+          (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
+      workerReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
+      break;
+    case TaskletFailure:
+      final AvroTaskletFailureReport taskletFailureReport = avroWorkerReport.getTaskletFailure();
+      final Exception exception =
+          (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
+      workerReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
+      break;
+    default:
+      throw new RuntimeException("Undefined WorkerReport type");
+    }
+    return workerReport;
+  }
+
+  /**
+   * Serialize Avro object to byte array.
+   * @param avroObject Avro object to serialize.
+   * @param theClass Class of the Avro object.
+   * @param <T> Type of the Avro object.
+   * @return Serialized byte array.
+   */
+  private static <T> byte[] toBytes(final T avroObject, final Class<T> theClass) {
+    final DatumWriter<T> reportWriter = new SpecificDatumWriter<>(theClass);
+    final byte[] theBytes;
+    try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+      reportWriter.write(avroObject, encoder);
+      encoder.flush();
+      out.flush();
+      theBytes = out.toByteArray();
+      return theBytes;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Deserialize byte array to Avro object.
+   * @param bytes Byte array to deserialize.
+   * @param theClass Class of the Avro object.
+   * @param <T> Type of the Avro object.
+   * @return Avro object de-serialized from byte array.
+   */
+  private static <T> T toAvroObject(final byte[] bytes, final Class<T> theClass) {
+    final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
+    final SpecificDatumReader<T> reader = new SpecificDatumReader<>(theClass);
+    try {
+      return reader.read(null, decoder);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private VortexAvroUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index 977302f..352cdbd 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -18,7 +18,6 @@
  */
 package org.apache.reef.vortex.driver;
 
-import org.apache.commons.lang.SerializationUtils;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.evaluator.*;
 import org.apache.reef.driver.task.RunningTask;
@@ -31,6 +30,7 @@ import org.apache.reef.tang.annotations.Unit;
 import org.apache.reef.vortex.api.VortexStart;
 import org.apache.reef.vortex.common.TaskletFailureReport;
 import org.apache.reef.vortex.common.TaskletResultReport;
+import org.apache.reef.vortex.common.VortexAvroUtils;
 import org.apache.reef.vortex.common.WorkerReport;
 import org.apache.reef.vortex.evaluator.VortexWorker;
 import org.apache.reef.wake.EStage;
@@ -154,15 +154,16 @@ final class VortexDriver {
     @Override
     public void onNext(final TaskMessage taskMessage) {
       final String workerId = taskMessage.getId();
-      final WorkerReport workerReport= (WorkerReport)SerializationUtils.deserialize(taskMessage.get());
+      final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get());
       switch (workerReport.getType()) {
       case TaskletResult:
-        final TaskletResultReport taskletResultReport = (TaskletResultReport)workerReport;
+        final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport;
         vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult());
         break;
       case TaskletFailure:
-        final TaskletFailureReport taskletFailureReport = (TaskletFailureReport)workerReport;
-        vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(), taskletFailureReport.getException());
+        final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
+        vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(),
+            taskletFailureReport.getException());
         break;
       default:
         throw new RuntimeException("Unknown Report");

http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
index 2c02d6e..b94b5b0 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
@@ -18,9 +18,9 @@
  */
 package org.apache.reef.vortex.driver;
 
-import org.apache.commons.lang.SerializationUtils;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.vortex.common.VortexAvroUtils;
 import org.apache.reef.vortex.common.VortexRequest;
 
 import javax.inject.Inject;
@@ -43,7 +43,7 @@ class VortexRequestor {
       @Override
       public void run() {
         //  Possible race condition with VortexWorkerManager#terminate is addressed by the global lock in VortexMaster
-        reefTask.send(SerializationUtils.serialize(vortexRequest));
+        reefTask.send(VortexAvroUtils.toBytes(vortexRequest));
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 6a2821e..e65830c 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -18,7 +18,6 @@
  */
 package org.apache.reef.vortex.evaluator;
 
-import org.apache.commons.lang.SerializationUtils;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.TaskSide;
 import org.apache.reef.tang.annotations.Parameter;
@@ -57,7 +56,7 @@ public final class VortexWorker implements Task, TaskMessageSource {
 
   @Inject
   private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager,
-                      @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) {
+                       @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) {
     this.heartBeatTriggerManager = heartBeatTriggerManager;
     this.numOfThreads = numOfThreads;
   }
@@ -88,33 +87,32 @@ public final class VortexWorker implements Task, TaskMessageSource {
             @Override
             public void run() {
               // Command Executor: Deserialize the command
-              final VortexRequest vortexRequest = (VortexRequest) SerializationUtils.deserialize(message);
+              final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message);
               switch (vortexRequest.getType()) {
-                case ExecuteTasklet:
-                  final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
-                  try {
-                    // Command Executor: Execute the command
-                    final Serializable result = taskletExecutionRequest.execute();
-
-                    // Command Executor: Tasklet successfully returns result
-                    final WorkerReport report =
-                        new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
-                    workerReports.addLast(SerializationUtils.serialize(report));
-                  } catch (Exception e) {
-                    // Command Executor: Tasklet throws an exception
-                    final WorkerReport report =
-                        new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
-                    workerReports.addLast(SerializationUtils.serialize(report));
-                  }
-
-                  heartBeatTriggerManager.triggerHeartBeat();
-                  break;
-                default:
-                  throw new RuntimeException("Unknown Command");
+              case ExecuteTasklet:
+                final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
+                try {
+                  // Command Executor: Execute the command
+                  final Serializable result = taskletExecutionRequest.execute();
+
+                  // Command Executor: Tasklet successfully returns result
+                  final WorkerReport report =
+                      new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
+                  workerReports.addLast(VortexAvroUtils.toBytes(report));
+                } catch (Exception e) {
+                  // Command Executor: Tasklet throws an exception
+                  final WorkerReport report =
+                      new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
+                  workerReports.addLast(VortexAvroUtils.toBytes(report));
+                }
+
+                heartBeatTriggerManager.triggerHeartBeat();
+                break;
+              default:
+                throw new RuntimeException("Unknown Command");
               }
             }
           });
-
         }
       }
     });