You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/09 04:58:00 UTC

git commit: TEZ-538. Change TaskImpl.getTaskAttemptTezEvents to return a copy of the sublist() - to fix a ConcurrentModificationException. (sseth)

Updated Branches:
  refs/heads/master 90ebb1629 -> 1d78f2300


TEZ-538. Change TaskImpl.getTaskAttemptTezEvents to return a copy of the
sublist() - to fix a ConcurrentModificationException. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1d78f230
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1d78f230
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1d78f230

Branch: refs/heads/master
Commit: 1d78f2300ac623947a860554887df36e45cd336f
Parents: 90ebb16
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Oct 8 19:55:17 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Oct 8 19:55:17 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  4 +--
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 37 ++++++++++++++++++++
 2 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1d78f230/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 5b59b51..4e7dd66 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -457,8 +457,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         int actualMax = Math.min(maxEvents,
             (tezEventsForTaskAttempts.size() - fromEventId));
         int toEventId = actualMax + fromEventId;
-        events = Collections.unmodifiableList(tezEventsForTaskAttempts.subList(
-            fromEventId, toEventId));
+        events = Collections.unmodifiableList(new ArrayList<TezEvent>(
+            tezEventsForTaskAttempts.subList(fromEventId, toEventId)));
         LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
             + "-" + toEventId + ")"); 
         // currently not modifying the events so that we dont have to create 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1d78f230/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 3527cbf..2a5e664 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -52,6 +52,7 @@ import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
@@ -60,6 +61,9 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -139,6 +143,17 @@ public class TestTaskImpl {
     assertTaskScheduledState();
   }
 
+  private void sendTezEventsToTask(TezTaskID taskId, int numTezEvents) {
+    TaskEventAddTezEvent event = null;
+    EventMetaData eventMetaData = new EventMetaData();
+    DataMovementEvent dmEvent = new DataMovementEvent(null);
+    TezEvent tezEvent = new TezEvent(dmEvent, eventMetaData);
+    for (int i = 0; i < numTezEvents; i++) {
+      event = new TaskEventAddTezEvent(taskId, tezEvent);
+      mockTask.handle(event);
+    }
+  }
+
   private void killTask(TezTaskID taskId) {
     mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
     assertTaskKillWaitState();
@@ -278,6 +293,28 @@ public class TestTaskImpl {
   }
 
   @Test
+  public void testFetchedEventsModifyUnderlyingList() {
+    // Tests to ensure that adding an event to a task, does not affect the
+    // result of past getTaskAttemptTezEvents calls.
+    List<TezEvent> fetchedList;
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    sendTezEventsToTask(taskId, 2);
+    TezTaskAttemptID attemptID = mockTask.getAttemptList().iterator().next()
+        .getID();
+    fetchedList = mockTask.getTaskAttemptTezEvents(attemptID, 0, 100);
+    assertEquals(2, fetchedList.size());
+
+    // Add events, make sure underlying list is the same, and no exceptions are
+    // thrown while accessing the previous list
+    sendTezEventsToTask(taskId, 4);
+    assertEquals(2, fetchedList.size());
+
+    fetchedList = mockTask.getTaskAttemptTezEvents(attemptID, 0, 100);
+    assertEquals(6, fetchedList.size());
+  }
+
+  @Test
   public void testTaskProgress() {
     LOG.info("--- START: testTaskProgress ---");