You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/05/11 21:32:26 UTC

tez git commit: TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization

Repository: tez
Updated Branches:
  refs/heads/master 13dca15da -> 88bd5b9dc


TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/88bd5b9d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/88bd5b9d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/88bd5b9d

Branch: refs/heads/master
Commit: 88bd5b9dc3f08b3c3b361e5167c219ad3e59c53f
Parents: 13dca15
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed May 11 16:32:01 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed May 11 16:32:01 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   8 +
 .../tez/runtime/api/impl/EventMetaData.java     |  48 ++++++
 .../apache/tez/runtime/api/impl/TezEvent.java   | 117 +++++++++-----
 .../tez/runtime/api/impl/TestTezEvent.java      | 152 +++++++++++++++++++
 4 files changed, 283 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dc9f4fe..62833dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization
   TEZ-3251. Allow ability to add custom counters to TaskRunner2Callable.
   TEZ-3250. TezTaskRunner2 should accept ExecutorService.
   TEZ-3245. Data race between addKnowInput and clearAndGetOnepartition of InputHost.
@@ -34,6 +35,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization
   TEZ-3251. Allow ability to add custom counters to TaskRunner2Callable.
   TEZ-3250. TezTaskRunner2 should accept ExecutorService.
   TEZ-3193. Deadlock in AM during task commit request.
@@ -467,6 +469,12 @@ TEZ-2003: Support for External services CHANGES
   TEZ-2735. rebase 08/21
   TEZ-2736. Pre-merge: Update CHANGES.txt and version in branch.
 
+Release 0.7.2: Unreleased
+
+INCOMPATIBLE CHANGES
+
+ALL CHANGES:
+  TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization
 
 Release 0.7.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
index 88cad47..0ee96af 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
@@ -19,11 +19,13 @@
 package org.apache.tez.runtime.api.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
 import javax.annotation.Nullable;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -137,4 +139,50 @@ public class EventMetaData implements Writable {
         + ", taskAttemptId=" + (taskAttemptID == null? "null" : taskAttemptID)
         + " }";
   }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((edgeVertexName == null) ? 0 : edgeVertexName.hashCode());
+    result = prime
+        * result
+        + ((producerConsumerType == null) ? 0 : producerConsumerType.hashCode());
+    result = prime * result
+        + ((taskAttemptID == null) ? 0 : taskAttemptID.hashCode());
+    result = prime * result
+        + ((taskVertexName == null) ? 0 : taskVertexName.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    EventMetaData other = (EventMetaData) obj;
+    if (edgeVertexName == null) {
+      if (other.edgeVertexName != null)
+        return false;
+    } else if (!edgeVertexName.equals(other.edgeVertexName))
+      return false;
+    if (producerConsumerType != other.producerConsumerType)
+      return false;
+    if (taskAttemptID == null) {
+      if (other.taskAttemptID != null)
+        return false;
+    } else if (!taskAttemptID.equals(other.taskAttemptID))
+      return false;
+    if (taskVertexName == null) {
+      if (other.taskVertexName != null)
+        return false;
+    } else if (!taskVertexName.equals(other.taskVertexName))
+      return false;
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index b3ce8c4..e76bdbb 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -21,7 +21,9 @@ package org.apache.tez.runtime.api.impl;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.OutputStream;
 
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.common.ProtoConverters;
 import org.apache.tez.common.TezConverterUtils;
@@ -36,10 +38,10 @@ import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.RootInputDataInformationEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent;
@@ -49,6 +51,10 @@ import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttempt
 import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
 import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptKilledEventProto;
 
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+
 public class TezEvent implements Writable {
 
   private EventType eventType;
@@ -58,7 +64,7 @@ public class TezEvent implements Writable {
   private EventMetaData sourceInfo;
 
   private EventMetaData destinationInfo;
-  
+
   private long eventReceivedTime;
 
   public TezEvent() {
@@ -67,7 +73,7 @@ public class TezEvent implements Writable {
   public TezEvent(Event event, EventMetaData sourceInfo) {
     this(event, sourceInfo, System.currentTimeMillis());
   }
-  
+
   public TezEvent(Event event, EventMetaData sourceInfo, long time) {
     this.event = event;
     this.eventReceivedTime = time;
@@ -103,11 +109,11 @@ public class TezEvent implements Writable {
   public Event getEvent() {
     return event;
   }
-  
+
   public void setEventReceivedTime(long eventReceivedTime) { // TODO save
     this.eventReceivedTime = eventReceivedTime;
   }
-  
+
   public long getEventReceivedTime() {
     return eventReceivedTime;
   }
@@ -145,67 +151,78 @@ public class TezEvent implements Writable {
       TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
       sEvt.write(out);
     } else {
-      byte[] eventBytes = null;
+      AbstractMessage message;
       switch (eventType) {
       case DATA_MOVEMENT_EVENT:
-        eventBytes =
+        message =
             ProtoConverters.convertDataMovementEventToProto(
-                (DataMovementEvent) event).toByteArray();
+                (DataMovementEvent) event);
         break;
       case COMPOSITE_DATA_MOVEMENT_EVENT:
-        eventBytes =
+        message =
             ProtoConverters.convertCompositeDataMovementEventToProto(
-                (CompositeDataMovementEvent) event).toByteArray();
+                (CompositeDataMovementEvent) event);
         break;
       case VERTEX_MANAGER_EVENT:
-        eventBytes = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event)
-            .toByteArray();
+        message = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event);
         break;
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
-        eventBytes = InputReadErrorEventProto.newBuilder()
+        message = InputReadErrorEventProto.newBuilder()
             .setIndex(ideEvt.getIndex())
             .setDiagnostics(ideEvt.getDiagnostics())
             .setVersion(ideEvt.getVersion())
-            .build().toByteArray();
+            .build();
         break;
       case TASK_ATTEMPT_FAILED_EVENT:
         TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
-        eventBytes = TaskAttemptFailedEventProto.newBuilder()
+        message = TaskAttemptFailedEventProto.newBuilder()
             .setDiagnostics(tfEvt.getDiagnostics())
             .setTaskFailureType(TezConverterUtils.failureTypeToProto(tfEvt.getTaskFailureType()))
-            .build().toByteArray();
+            .build();
         break;
         case TASK_ATTEMPT_KILLED_EVENT:
           TaskAttemptKilledEvent tkEvent = (TaskAttemptKilledEvent) event;
-          eventBytes = TaskAttemptKilledEventProto.newBuilder()
-              .setDiagnostics(tkEvent.getDiagnostics()).build().toByteArray();
+          message = TaskAttemptKilledEventProto.newBuilder()
+              .setDiagnostics(tkEvent.getDiagnostics()).build();
           break;
       case TASK_ATTEMPT_COMPLETED_EVENT:
-        eventBytes = TaskAttemptCompletedEventProto.newBuilder()
-            .build().toByteArray();
+        message = TaskAttemptCompletedEventProto.newBuilder()
+            .build();
         break;
       case INPUT_FAILED_EVENT:
         InputFailedEvent ifEvt = (InputFailedEvent) event;
-        eventBytes = InputFailedEventProto.newBuilder()
+        message = InputFailedEventProto.newBuilder()
             .setTargetIndex(ifEvt.getTargetIndex())
-            .setVersion(ifEvt.getVersion()).build().toByteArray();
+            .setVersion(ifEvt.getVersion()).build();
         break;
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
-        eventBytes = ProtoConverters.convertRootInputDataInformationEventToProto(
-            (InputDataInformationEvent) event).toByteArray();
+        message = ProtoConverters.convertRootInputDataInformationEventToProto(
+            (InputDataInformationEvent) event);
         break;
       case ROOT_INPUT_INITIALIZER_EVENT:
-        eventBytes = ProtoConverters
-            .convertRootInputInitializerEventToProto((InputInitializerEvent) event)
-            .toByteArray();
+        message = ProtoConverters
+            .convertRootInputInitializerEventToProto((InputInitializerEvent) event);
         break;
       default:
         throw new TezUncheckedException("Unknown TezEvent"
            + ", type=" + eventType);
       }
-      out.writeInt(eventBytes.length);
-      out.write(eventBytes);
+      if (out instanceof OutputStream) { //DataOutputBuffer extends DataOutputStream
+        int serializedSize = message.getSerializedSize();
+        out.writeInt(serializedSize);
+        int buffersize = serializedSize < CodedOutputStream.DEFAULT_BUFFER_SIZE ? serializedSize
+            : CodedOutputStream.DEFAULT_BUFFER_SIZE;
+        CodedOutputStream codedOut = CodedOutputStream.newInstance(
+            (OutputStream) out, buffersize);
+        message.writeTo(codedOut);
+        codedOut.flush();
+      } else {
+        byte[] eventBytes = message.toByteArray();
+        out.writeInt(eventBytes.length);
+        out.write(eventBytes);
+      }
+
     }
   }
 
@@ -222,36 +239,45 @@ public class TezEvent implements Writable {
       ((TaskStatusUpdateEvent)event).readFields(in);
     } else {
       int eventBytesLen = in.readInt();
-      byte[] eventBytes = new byte[eventBytesLen];
-      in.readFully(eventBytes);
+      byte[] eventBytes;
+      CodedInputStream input;
+      int startOffset = 0;
+      if (in instanceof DataInputBuffer) {
+        eventBytes = ((DataInputBuffer)in).getData();
+        startOffset = ((DataInputBuffer) in).getPosition();
+      } else {
+        eventBytes = new byte[eventBytesLen];
+        in.readFully(eventBytes);
+      }
+      input = CodedInputStream.newInstance(eventBytes, startOffset, eventBytesLen);
       switch (eventType) {
       case DATA_MOVEMENT_EVENT:
         DataMovementEventProto dmProto =
-            DataMovementEventProto.parseFrom(eventBytes);
+            DataMovementEventProto.parseFrom(input);
         event = ProtoConverters.convertDataMovementEventFromProto(dmProto);
         break;
       case COMPOSITE_DATA_MOVEMENT_EVENT:
-        CompositeEventProto cProto = CompositeEventProto.parseFrom(eventBytes);
+        CompositeEventProto cProto = CompositeEventProto.parseFrom(input);
         event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto);
         break;
       case VERTEX_MANAGER_EVENT:
-        VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(eventBytes);
+        VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(input);
         event = ProtoConverters.convertVertexManagerEventFromProto(vmProto);
         break;
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEventProto ideProto =
-            InputReadErrorEventProto.parseFrom(eventBytes);
+            InputReadErrorEventProto.parseFrom(input);
         event = InputReadErrorEvent.create(ideProto.getDiagnostics(),
             ideProto.getIndex(), ideProto.getVersion());
         break;
       case TASK_ATTEMPT_FAILED_EVENT:
         TaskAttemptFailedEventProto tfProto =
-            TaskAttemptFailedEventProto.parseFrom(eventBytes);
+            TaskAttemptFailedEventProto.parseFrom(input);
         event = new TaskAttemptFailedEvent(tfProto.getDiagnostics(),
             TezConverterUtils.failureTypeFromProto(tfProto.getTaskFailureType()));
         break;
       case TASK_ATTEMPT_KILLED_EVENT:
-        TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(eventBytes);
+        TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(input);
         event = new TaskAttemptKilledEvent(tkProto.getDiagnostics());
         break;
       case TASK_ATTEMPT_COMPLETED_EVENT:
@@ -259,16 +285,16 @@ public class TezEvent implements Writable {
         break;
       case INPUT_FAILED_EVENT:
         InputFailedEventProto ifProto =
-            InputFailedEventProto.parseFrom(eventBytes);
+            InputFailedEventProto.parseFrom(input);
         event = InputFailedEvent.create(ifProto.getTargetIndex(), ifProto.getVersion());
         break;
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
         RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto
-            .parseFrom(eventBytes);
+            .parseFrom(input);
         event = ProtoConverters.convertRootInputDataInformationEventFromProto(difProto);
         break;
       case ROOT_INPUT_INITIALIZER_EVENT:
-        EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(eventBytes);
+        EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(input);
         event = ProtoConverters.convertRootInputInitializerEventFromProto(riiProto);
         break;
       default:
@@ -276,6 +302,13 @@ public class TezEvent implements Writable {
         throw new TezUncheckedException("Unexpected TezEvent"
            + ", type=" + eventType);
       }
+      if (in instanceof DataInputBuffer) {
+        // Skip so that position is updated
+        int skipped = in.skipBytes(eventBytesLen);
+        if (skipped != eventBytesLen) {
+          throw new TezUncheckedException("Expected to skip " + eventBytesLen + " bytes. Actually skipped = " + skipped);
+        }
+      }
     }
   }
 
@@ -294,7 +327,7 @@ public class TezEvent implements Writable {
     } else {
       out.writeBoolean(false);
     }
-  } 
+  }
 
   @Override
   public void readFields(DataInput in) throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java
new file mode 100644
index 0000000..b39c4ed
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.api.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
+public class TestTezEvent {
+
+  @Test
+  public void testSerialization() throws IOException {
+
+    ArrayList<TezEvent> events = new ArrayList<TezEvent>();
+
+    Configuration conf = new Configuration(true);
+    String confVal = RandomStringUtils.random(10000, true, true);
+    conf.set("testKey", confVal);
+    UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
+    TezTaskAttemptID srcTAID = TezTaskAttemptID.getInstance(
+        TezTaskID.fromString("task_1454468251169_866787_1_02_000000"), 1000);
+    TezTaskAttemptID destTAID = TezTaskAttemptID.getInstance(
+        TezTaskID.fromString("task_1454468251169_866787_1_02_000000"), 2000);
+    EventMetaData srcInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+        "v1", "v2", srcTAID);
+    EventMetaData destInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+        "v3", "v4", destTAID);
+
+    // Case of size less than 4K and parsing skipped during deserialization
+    events.add(new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(
+        EventProducerConsumerType.PROCESSOR, "v1", "v2", srcTAID)));
+    TezEvent dmeEvent = new TezEvent(DataMovementEvent.create(1000, 3, 1,
+        payload.getPayload()), srcInfo, System.currentTimeMillis());
+    dmeEvent.setDestinationInfo(destInfo);
+    events.add(dmeEvent);
+    // Different code path
+    events.add(new TezEvent(new TaskStatusUpdateEvent(null, 0.1f, null, false),
+        new EventMetaData(EventProducerConsumerType.PROCESSOR, "v5", "v6",
+            srcTAID)));
+
+    // Serialize to different types of DataOutput
+    // One that implements OutputStream and one that does not
+    DataOutputBuffer dataout = new DataOutputBuffer();
+    ByteArrayDataOutput bout = ByteStreams.newDataOutput();
+    serializeEvents(events, dataout);
+    serializeEvents(events, bout);
+
+    // Deserialize from different types of DataInput
+    // One with DataInputBuffer and another different implementation
+    DataInputBuffer datain = new DataInputBuffer();
+    datain.reset(dataout.getData(), dataout.getLength());
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataout.getData(), 0, dataout.getLength()));
+    ArrayList<TezEvent> actual1 = deserializeEvents(datain);
+    ArrayList<TezEvent> actual2 = deserializeEvents(dis);
+    assertEventEquals(events, actual1);
+    assertEventEquals(events, actual2);
+
+    byte[] serializedBytes = bout.toByteArray();
+    datain.reset(serializedBytes, serializedBytes.length);
+    dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
+    actual1 = deserializeEvents(datain);
+    actual2 = deserializeEvents(dis);
+    assertEventEquals(events, actual1);
+    assertEventEquals(events, actual2);
+
+  }
+
+  private void serializeEvents(ArrayList<TezEvent> events, DataOutput out) throws IOException {
+    out.writeInt(events.size());
+    for (TezEvent e : events) {
+      e.write(out);
+    }
+  }
+
+  private ArrayList<TezEvent> deserializeEvents(DataInput in) throws IOException {
+    int eventsCount = in.readInt();
+    ArrayList<TezEvent> events = new ArrayList<TezEvent>(eventsCount);
+    for (int i = 0; i < eventsCount; ++i) {
+      TezEvent e = new TezEvent();
+      e.readFields(in);
+      events.add(e);
+    }
+    return events;
+  }
+
+  private void assertEventEquals(ArrayList<TezEvent> expectedList, ArrayList<TezEvent> actualList) {
+    Assert.assertEquals(expectedList.size(), actualList.size());
+    for (int i = 0; i < expectedList.size(); i++) {
+      TezEvent expected = expectedList.get(i);
+      TezEvent actual = actualList.get(i);
+      Assert.assertEquals(expected.getEventReceivedTime(), actual.getEventReceivedTime());
+      Assert.assertEquals(expected.getSourceInfo(), actual.getSourceInfo());
+      Assert.assertEquals(expected.getDestinationInfo(), actual.getDestinationInfo());
+      Assert.assertEquals(expected.getEventType(), actual.getEventType());
+      // Doing this instead of implementing equals methods for events
+      if (i == 0) {
+        Assert.assertTrue(actual.getEvent() instanceof TaskAttemptCompletedEvent);
+      } else if (i == 1) {
+        DataMovementEvent dmeExpected = (DataMovementEvent) expected.getEvent();
+        DataMovementEvent dmeActual = (DataMovementEvent) actual.getEvent();
+        Assert.assertEquals(dmeExpected.getSourceIndex(), dmeActual.getSourceIndex());
+        Assert.assertEquals(dmeExpected.getTargetIndex(), dmeActual.getTargetIndex());
+        Assert.assertEquals(dmeExpected.getVersion(), dmeActual.getVersion());
+        Assert.assertEquals(dmeExpected.getUserPayload(), dmeActual.getUserPayload());
+      } else {
+        TaskStatusUpdateEvent tsuExpected = (TaskStatusUpdateEvent) expected.getEvent();
+        TaskStatusUpdateEvent tsuActual = (TaskStatusUpdateEvent) actual.getEvent();
+        Assert.assertEquals(tsuExpected.getCounters(), tsuActual.getCounters());
+        Assert.assertEquals(tsuExpected.getProgress(), tsuActual.getProgress(), 0);
+        Assert.assertEquals(tsuExpected.getProgressNotified(), tsuActual.getProgressNotified());
+        Assert.assertEquals(tsuExpected.getStatistics(), tsuActual.getStatistics());
+      }
+    }
+  }
+
+}