You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/09 04:58:00 UTC
git commit: TEZ-538. Change TaskImpl.getTaskAttemptTezEvents to
return a copy of the sublist() - to fix a ConcurrentModificationException.
(sseth)
Updated Branches:
refs/heads/master 90ebb1629 -> 1d78f2300
TEZ-538. Change TaskImpl.getTaskAttemptTezEvents to return a copy of the
sublist() - to fix a ConcurrentModificationException. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1d78f230
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1d78f230
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1d78f230
Branch: refs/heads/master
Commit: 1d78f2300ac623947a860554887df36e45cd336f
Parents: 90ebb16
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Oct 8 19:55:17 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Oct 8 19:55:17 2013 -0700
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 4 +--
.../tez/dag/app/dag/impl/TestTaskImpl.java | 37 ++++++++++++++++++++
2 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1d78f230/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 5b59b51..4e7dd66 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -457,8 +457,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
int actualMax = Math.min(maxEvents,
(tezEventsForTaskAttempts.size() - fromEventId));
int toEventId = actualMax + fromEventId;
- events = Collections.unmodifiableList(tezEventsForTaskAttempts.subList(
- fromEventId, toEventId));
+ events = Collections.unmodifiableList(new ArrayList<TezEvent>(
+ tezEventsForTaskAttempts.subList(fromEventId, toEventId)));
LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
+ "-" + toEventId + ")");
// currently not modifying the events so that we dont have to create
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1d78f230/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 3527cbf..2a5e664 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -52,6 +52,7 @@ import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
@@ -60,6 +61,9 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -139,6 +143,17 @@ public class TestTaskImpl {
assertTaskScheduledState();
}
+ private void sendTezEventsToTask(TezTaskID taskId, int numTezEvents) {
+ TaskEventAddTezEvent event = null;
+ EventMetaData eventMetaData = new EventMetaData();
+ DataMovementEvent dmEvent = new DataMovementEvent(null);
+ TezEvent tezEvent = new TezEvent(dmEvent, eventMetaData);
+ for (int i = 0; i < numTezEvents; i++) {
+ event = new TaskEventAddTezEvent(taskId, tezEvent);
+ mockTask.handle(event);
+ }
+ }
+
private void killTask(TezTaskID taskId) {
mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
assertTaskKillWaitState();
@@ -278,6 +293,28 @@ public class TestTaskImpl {
}
@Test
+ public void testFetchedEventsModifyUnderlyingList() {
+ // Tests to ensure that adding an event to a task, does not affect the
+ // result of past getTaskAttemptTezEvents calls.
+ List<TezEvent> fetchedList;
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ sendTezEventsToTask(taskId, 2);
+ TezTaskAttemptID attemptID = mockTask.getAttemptList().iterator().next()
+ .getID();
+ fetchedList = mockTask.getTaskAttemptTezEvents(attemptID, 0, 100);
+ assertEquals(2, fetchedList.size());
+
+ // Add events, make sure underlying list is the same, and no exceptions are
+ // thrown while accessing the previous list
+ sendTezEventsToTask(taskId, 4);
+ assertEquals(2, fetchedList.size());
+
+ fetchedList = mockTask.getTaskAttemptTezEvents(attemptID, 0, 100);
+ assertEquals(6, fetchedList.size());
+ }
+
+ @Test
public void testTaskProgress() {
LOG.info("--- START: testTaskProgress ---");