You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/05/11 21:32:26 UTC
tez git commit: TEZ-2342. Reduce bytearray copy with TezEvent
Serialization and deserialization
Repository: tez
Updated Branches:
refs/heads/master 13dca15da -> 88bd5b9dc
TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/88bd5b9d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/88bd5b9d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/88bd5b9d
Branch: refs/heads/master
Commit: 88bd5b9dc3f08b3c3b361e5167c219ad3e59c53f
Parents: 13dca15
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed May 11 16:32:01 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed May 11 16:32:01 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 8 +
.../tez/runtime/api/impl/EventMetaData.java | 48 ++++++
.../apache/tez/runtime/api/impl/TezEvent.java | 117 +++++++++-----
.../tez/runtime/api/impl/TestTezEvent.java | 152 +++++++++++++++++++
4 files changed, 283 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dc9f4fe..62833dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization
TEZ-3251. Allow ability to add custom counters to TaskRunner2Callable.
TEZ-3250. TezTaskRunner2 should accept ExecutorService.
TEZ-3245. Data race between addKnowInput and clearAndGetOnepartition of InputHost.
@@ -34,6 +35,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization
TEZ-3251. Allow ability to add custom counters to TaskRunner2Callable.
TEZ-3250. TezTaskRunner2 should accept ExecutorService.
TEZ-3193. Deadlock in AM during task commit request.
@@ -467,6 +469,12 @@ TEZ-2003: Support for External services CHANGES
TEZ-2735. rebase 08/21
TEZ-2736. Pre-merge: Update CHANGES.txt and version in branch.
+Release 0.7.2: Unreleased
+
+INCOMPATIBLE CHANGES
+
+ALL CHANGES:
+ TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization
Release 0.7.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
index 88cad47..0ee96af 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
@@ -19,11 +19,13 @@
package org.apache.tez.runtime.api.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import javax.annotation.Nullable;
+
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -137,4 +139,50 @@ public class EventMetaData implements Writable {
+ ", taskAttemptId=" + (taskAttemptID == null? "null" : taskAttemptID)
+ " }";
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((edgeVertexName == null) ? 0 : edgeVertexName.hashCode());
+ result = prime
+ * result
+ + ((producerConsumerType == null) ? 0 : producerConsumerType.hashCode());
+ result = prime * result
+ + ((taskAttemptID == null) ? 0 : taskAttemptID.hashCode());
+ result = prime * result
+ + ((taskVertexName == null) ? 0 : taskVertexName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ EventMetaData other = (EventMetaData) obj;
+ if (edgeVertexName == null) {
+ if (other.edgeVertexName != null)
+ return false;
+ } else if (!edgeVertexName.equals(other.edgeVertexName))
+ return false;
+ if (producerConsumerType != other.producerConsumerType)
+ return false;
+ if (taskAttemptID == null) {
+ if (other.taskAttemptID != null)
+ return false;
+ } else if (!taskAttemptID.equals(other.taskAttemptID))
+ return false;
+ if (taskVertexName == null) {
+ if (other.taskVertexName != null)
+ return false;
+ } else if (!taskVertexName.equals(other.taskVertexName))
+ return false;
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index b3ce8c4..e76bdbb 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -21,7 +21,9 @@ package org.apache.tez.runtime.api.impl;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.tez.common.ProtoConverters;
import org.apache.tez.common.TezConverterUtils;
@@ -36,10 +38,10 @@ import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto;
import org.apache.tez.runtime.api.events.EventProtos.RootInputDataInformationEventProto;
import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent;
@@ -49,6 +51,10 @@ import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttempt
import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptKilledEventProto;
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+
public class TezEvent implements Writable {
private EventType eventType;
@@ -58,7 +64,7 @@ public class TezEvent implements Writable {
private EventMetaData sourceInfo;
private EventMetaData destinationInfo;
-
+
private long eventReceivedTime;
public TezEvent() {
@@ -67,7 +73,7 @@ public class TezEvent implements Writable {
public TezEvent(Event event, EventMetaData sourceInfo) {
this(event, sourceInfo, System.currentTimeMillis());
}
-
+
public TezEvent(Event event, EventMetaData sourceInfo, long time) {
this.event = event;
this.eventReceivedTime = time;
@@ -103,11 +109,11 @@ public class TezEvent implements Writable {
public Event getEvent() {
return event;
}
-
+
public void setEventReceivedTime(long eventReceivedTime) { // TODO save
this.eventReceivedTime = eventReceivedTime;
}
-
+
public long getEventReceivedTime() {
return eventReceivedTime;
}
@@ -145,67 +151,78 @@ public class TezEvent implements Writable {
TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
sEvt.write(out);
} else {
- byte[] eventBytes = null;
+ AbstractMessage message;
switch (eventType) {
case DATA_MOVEMENT_EVENT:
- eventBytes =
+ message =
ProtoConverters.convertDataMovementEventToProto(
- (DataMovementEvent) event).toByteArray();
+ (DataMovementEvent) event);
break;
case COMPOSITE_DATA_MOVEMENT_EVENT:
- eventBytes =
+ message =
ProtoConverters.convertCompositeDataMovementEventToProto(
- (CompositeDataMovementEvent) event).toByteArray();
+ (CompositeDataMovementEvent) event);
break;
case VERTEX_MANAGER_EVENT:
- eventBytes = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event)
- .toByteArray();
+ message = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event);
break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
- eventBytes = InputReadErrorEventProto.newBuilder()
+ message = InputReadErrorEventProto.newBuilder()
.setIndex(ideEvt.getIndex())
.setDiagnostics(ideEvt.getDiagnostics())
.setVersion(ideEvt.getVersion())
- .build().toByteArray();
+ .build();
break;
case TASK_ATTEMPT_FAILED_EVENT:
TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
- eventBytes = TaskAttemptFailedEventProto.newBuilder()
+ message = TaskAttemptFailedEventProto.newBuilder()
.setDiagnostics(tfEvt.getDiagnostics())
.setTaskFailureType(TezConverterUtils.failureTypeToProto(tfEvt.getTaskFailureType()))
- .build().toByteArray();
+ .build();
break;
case TASK_ATTEMPT_KILLED_EVENT:
TaskAttemptKilledEvent tkEvent = (TaskAttemptKilledEvent) event;
- eventBytes = TaskAttemptKilledEventProto.newBuilder()
- .setDiagnostics(tkEvent.getDiagnostics()).build().toByteArray();
+ message = TaskAttemptKilledEventProto.newBuilder()
+ .setDiagnostics(tkEvent.getDiagnostics()).build();
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
- eventBytes = TaskAttemptCompletedEventProto.newBuilder()
- .build().toByteArray();
+ message = TaskAttemptCompletedEventProto.newBuilder()
+ .build();
break;
case INPUT_FAILED_EVENT:
InputFailedEvent ifEvt = (InputFailedEvent) event;
- eventBytes = InputFailedEventProto.newBuilder()
+ message = InputFailedEventProto.newBuilder()
.setTargetIndex(ifEvt.getTargetIndex())
- .setVersion(ifEvt.getVersion()).build().toByteArray();
+ .setVersion(ifEvt.getVersion()).build();
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
- eventBytes = ProtoConverters.convertRootInputDataInformationEventToProto(
- (InputDataInformationEvent) event).toByteArray();
+ message = ProtoConverters.convertRootInputDataInformationEventToProto(
+ (InputDataInformationEvent) event);
break;
case ROOT_INPUT_INITIALIZER_EVENT:
- eventBytes = ProtoConverters
- .convertRootInputInitializerEventToProto((InputInitializerEvent) event)
- .toByteArray();
+ message = ProtoConverters
+ .convertRootInputInitializerEventToProto((InputInitializerEvent) event);
break;
default:
throw new TezUncheckedException("Unknown TezEvent"
+ ", type=" + eventType);
}
- out.writeInt(eventBytes.length);
- out.write(eventBytes);
+ if (out instanceof OutputStream) { //DataOutputBuffer extends DataOutputStream
+ int serializedSize = message.getSerializedSize();
+ out.writeInt(serializedSize);
+ int buffersize = serializedSize < CodedOutputStream.DEFAULT_BUFFER_SIZE ? serializedSize
+ : CodedOutputStream.DEFAULT_BUFFER_SIZE;
+ CodedOutputStream codedOut = CodedOutputStream.newInstance(
+ (OutputStream) out, buffersize);
+ message.writeTo(codedOut);
+ codedOut.flush();
+ } else {
+ byte[] eventBytes = message.toByteArray();
+ out.writeInt(eventBytes.length);
+ out.write(eventBytes);
+ }
+
}
}
@@ -222,36 +239,45 @@ public class TezEvent implements Writable {
((TaskStatusUpdateEvent)event).readFields(in);
} else {
int eventBytesLen = in.readInt();
- byte[] eventBytes = new byte[eventBytesLen];
- in.readFully(eventBytes);
+ byte[] eventBytes;
+ CodedInputStream input;
+ int startOffset = 0;
+ if (in instanceof DataInputBuffer) {
+ eventBytes = ((DataInputBuffer)in).getData();
+ startOffset = ((DataInputBuffer) in).getPosition();
+ } else {
+ eventBytes = new byte[eventBytesLen];
+ in.readFully(eventBytes);
+ }
+ input = CodedInputStream.newInstance(eventBytes, startOffset, eventBytesLen);
switch (eventType) {
case DATA_MOVEMENT_EVENT:
DataMovementEventProto dmProto =
- DataMovementEventProto.parseFrom(eventBytes);
+ DataMovementEventProto.parseFrom(input);
event = ProtoConverters.convertDataMovementEventFromProto(dmProto);
break;
case COMPOSITE_DATA_MOVEMENT_EVENT:
- CompositeEventProto cProto = CompositeEventProto.parseFrom(eventBytes);
+ CompositeEventProto cProto = CompositeEventProto.parseFrom(input);
event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto);
break;
case VERTEX_MANAGER_EVENT:
- VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(eventBytes);
+ VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(input);
event = ProtoConverters.convertVertexManagerEventFromProto(vmProto);
break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEventProto ideProto =
- InputReadErrorEventProto.parseFrom(eventBytes);
+ InputReadErrorEventProto.parseFrom(input);
event = InputReadErrorEvent.create(ideProto.getDiagnostics(),
ideProto.getIndex(), ideProto.getVersion());
break;
case TASK_ATTEMPT_FAILED_EVENT:
TaskAttemptFailedEventProto tfProto =
- TaskAttemptFailedEventProto.parseFrom(eventBytes);
+ TaskAttemptFailedEventProto.parseFrom(input);
event = new TaskAttemptFailedEvent(tfProto.getDiagnostics(),
TezConverterUtils.failureTypeFromProto(tfProto.getTaskFailureType()));
break;
case TASK_ATTEMPT_KILLED_EVENT:
- TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(eventBytes);
+ TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(input);
event = new TaskAttemptKilledEvent(tkProto.getDiagnostics());
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
@@ -259,16 +285,16 @@ public class TezEvent implements Writable {
break;
case INPUT_FAILED_EVENT:
InputFailedEventProto ifProto =
- InputFailedEventProto.parseFrom(eventBytes);
+ InputFailedEventProto.parseFrom(input);
event = InputFailedEvent.create(ifProto.getTargetIndex(), ifProto.getVersion());
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto
- .parseFrom(eventBytes);
+ .parseFrom(input);
event = ProtoConverters.convertRootInputDataInformationEventFromProto(difProto);
break;
case ROOT_INPUT_INITIALIZER_EVENT:
- EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(eventBytes);
+ EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(input);
event = ProtoConverters.convertRootInputInitializerEventFromProto(riiProto);
break;
default:
@@ -276,6 +302,13 @@ public class TezEvent implements Writable {
throw new TezUncheckedException("Unexpected TezEvent"
+ ", type=" + eventType);
}
+ if (in instanceof DataInputBuffer) {
+ // Skip so that position is updated
+ int skipped = in.skipBytes(eventBytesLen);
+ if (skipped != eventBytesLen) {
+ throw new TezUncheckedException("Expected to skip " + eventBytesLen + " bytes. Actually skipped = " + skipped);
+ }
+ }
}
}
@@ -294,7 +327,7 @@ public class TezEvent implements Writable {
} else {
out.writeBoolean(false);
}
- }
+ }
@Override
public void readFields(DataInput in) throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java
new file mode 100644
index 0000000..b39c4ed
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.api.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
+public class TestTezEvent {
+
+ @Test
+ public void testSerialization() throws IOException {
+
+ ArrayList<TezEvent> events = new ArrayList<TezEvent>();
+
+ Configuration conf = new Configuration(true);
+ String confVal = RandomStringUtils.random(10000, true, true);
+ conf.set("testKey", confVal);
+ UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
+ TezTaskAttemptID srcTAID = TezTaskAttemptID.getInstance(
+ TezTaskID.fromString("task_1454468251169_866787_1_02_000000"), 1000);
+ TezTaskAttemptID destTAID = TezTaskAttemptID.getInstance(
+ TezTaskID.fromString("task_1454468251169_866787_1_02_000000"), 2000);
+ EventMetaData srcInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+ "v1", "v2", srcTAID);
+ EventMetaData destInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+ "v3", "v4", destTAID);
+
+ // Case of size less than 4K and parsing skipped during deserialization
+ events.add(new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(
+ EventProducerConsumerType.PROCESSOR, "v1", "v2", srcTAID)));
+ TezEvent dmeEvent = new TezEvent(DataMovementEvent.create(1000, 3, 1,
+ payload.getPayload()), srcInfo, System.currentTimeMillis());
+ dmeEvent.setDestinationInfo(destInfo);
+ events.add(dmeEvent);
+ // Different code path
+ events.add(new TezEvent(new TaskStatusUpdateEvent(null, 0.1f, null, false),
+ new EventMetaData(EventProducerConsumerType.PROCESSOR, "v5", "v6",
+ srcTAID)));
+
+ // Serialize to different types of DataOutput
+ // One that implements OutputStream and one that does not
+ DataOutputBuffer dataout = new DataOutputBuffer();
+ ByteArrayDataOutput bout = ByteStreams.newDataOutput();
+ serializeEvents(events, dataout);
+ serializeEvents(events, bout);
+
+ // Deserialize from different types of DataInput
+ // One with DataInputBuffer and another different implementation
+ DataInputBuffer datain = new DataInputBuffer();
+ datain.reset(dataout.getData(), dataout.getLength());
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataout.getData(), 0, dataout.getLength()));
+ ArrayList<TezEvent> actual1 = deserializeEvents(datain);
+ ArrayList<TezEvent> actual2 = deserializeEvents(dis);
+ assertEventEquals(events, actual1);
+ assertEventEquals(events, actual2);
+
+ byte[] serializedBytes = bout.toByteArray();
+ datain.reset(serializedBytes, serializedBytes.length);
+ dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
+ actual1 = deserializeEvents(datain);
+ actual2 = deserializeEvents(dis);
+ assertEventEquals(events, actual1);
+ assertEventEquals(events, actual2);
+
+ }
+
+ private void serializeEvents(ArrayList<TezEvent> events, DataOutput out) throws IOException {
+ out.writeInt(events.size());
+ for (TezEvent e : events) {
+ e.write(out);
+ }
+ }
+
+ private ArrayList<TezEvent> deserializeEvents(DataInput in) throws IOException {
+ int eventsCount = in.readInt();
+ ArrayList<TezEvent> events = new ArrayList<TezEvent>(eventsCount);
+ for (int i = 0; i < eventsCount; ++i) {
+ TezEvent e = new TezEvent();
+ e.readFields(in);
+ events.add(e);
+ }
+ return events;
+ }
+
+ private void assertEventEquals(ArrayList<TezEvent> expectedList, ArrayList<TezEvent> actualList) {
+ Assert.assertEquals(expectedList.size(), actualList.size());
+ for (int i = 0; i < expectedList.size(); i++) {
+ TezEvent expected = expectedList.get(i);
+ TezEvent actual = actualList.get(i);
+ Assert.assertEquals(expected.getEventReceivedTime(), actual.getEventReceivedTime());
+ Assert.assertEquals(expected.getSourceInfo(), actual.getSourceInfo());
+ Assert.assertEquals(expected.getDestinationInfo(), actual.getDestinationInfo());
+ Assert.assertEquals(expected.getEventType(), actual.getEventType());
+ // Doing this instead of implementing equals methods for events
+ if (i == 0) {
+ Assert.assertTrue(actual.getEvent() instanceof TaskAttemptCompletedEvent);
+ } else if (i == 1) {
+ DataMovementEvent dmeExpected = (DataMovementEvent) expected.getEvent();
+ DataMovementEvent dmeActual = (DataMovementEvent) actual.getEvent();
+ Assert.assertEquals(dmeExpected.getSourceIndex(), dmeActual.getSourceIndex());
+ Assert.assertEquals(dmeExpected.getTargetIndex(), dmeActual.getTargetIndex());
+ Assert.assertEquals(dmeExpected.getVersion(), dmeActual.getVersion());
+ Assert.assertEquals(dmeExpected.getUserPayload(), dmeActual.getUserPayload());
+ } else {
+ TaskStatusUpdateEvent tsuExpected = (TaskStatusUpdateEvent) expected.getEvent();
+ TaskStatusUpdateEvent tsuActual = (TaskStatusUpdateEvent) actual.getEvent();
+ Assert.assertEquals(tsuExpected.getCounters(), tsuActual.getCounters());
+ Assert.assertEquals(tsuExpected.getProgress(), tsuActual.getProgress(), 0);
+ Assert.assertEquals(tsuExpected.getProgressNotified(), tsuActual.getProgressNotified());
+ Assert.assertEquals(tsuExpected.getStatistics(), tsuActual.getStatistics());
+ }
+ }
+ }
+
+}