You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:49:51 UTC
[06/25] git commit: TEZ-853. Support counters recovery. (Jeff Zhang
via hitesh)
TEZ-853. Support counters recovery. (Jeff Zhang via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/16a0f579
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/16a0f579
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/16a0f579
Branch: refs/heads/TEZ-8
Commit: 16a0f57952d554e95ae1346ac3f360998ac7faac
Parents: fcc7426
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 11 22:03:26 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 11 22:03:26 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/common/counters/DAGCounter.java | 1 +
.../apache/tez/dag/api/client/DAGStatus.java | 2 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 53 +++++------
.../dag/history/events/TaskFinishedEvent.java | 10 +--
.../dag/history/events/VertexFinishedEvent.java | 8 --
tez-dag/src/main/proto/HistoryEvents.proto | 4 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 16 ++++
.../app/dag/impl/TestTaskAttemptRecovery.java | 60 +++++++++++--
.../TestHistoryEventsProtoConversion.java | 6 --
.../org/apache/tez/test/TestAMRecovery.java | 94 +++++++++++++++++---
11 files changed, 178 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4fc7e83..519aaa6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ ALL CHANGES:
TEZ-1544. Link to release artifacts for 0.5.0 does not point to a specific link for 0.5.0.
TEZ-1559. Add system tests for AM recovery.
TEZ-850. Recovery unit tests.
+ TEZ-853. Support counters recovery.
Release 0.5.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
index 3598572..b5bdffb 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
public enum DAGCounter {
NUM_FAILED_TASKS,
NUM_KILLED_TASKS,
+ NUM_SUCCEEDED_TASKS,
TOTAL_LAUNCHED_TASKS,
OTHER_LOCAL_TASKS,
DATA_LOCAL_TASKS,
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index 297c6f2..7c51c42 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -170,7 +170,7 @@ public class DAGStatus {
+ ", diagnostics="
+ StringUtils.join(getDiagnostics(), LINE_SEPARATOR)
+ ", counters="
- + (dagCounters == null ? "null" : dagCounters.toString()));
+ + (getDAGCounters() == null ? "null" : getDAGCounters().toString()));
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 7ba90b5..b8430cc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -791,6 +791,7 @@ public class TaskAttemptImpl implements TaskAttempt,
this.launchTime = tEvent.getStartTime();
recoveryStartEventSeen = true;
recoveredState = TaskAttemptState.RUNNING;
+ sendEvent(createDAGCounterUpdateEventTALaunched(this));
return recoveredState;
}
case TASK_ATTEMPT_FINISHED:
@@ -806,6 +807,7 @@ public class TaskAttemptImpl implements TaskAttempt,
this.reportedStatus.state = tEvent.getState();
this.diagnostics.add(tEvent.getDiagnostics());
this.recoveredState = tEvent.getState();
+ sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
return recoveredState;
}
default:
@@ -829,47 +831,29 @@ public class TaskAttemptImpl implements TaskAttempt,
}
// TOOD Merge some of these JobCounter events.
- private static DAGEventCounterUpdate createJobCounterUpdateEventTALaunched(
+ private static DAGEventCounterUpdate createDAGCounterUpdateEventTALaunched(
TaskAttemptImpl ta) {
- DAGEventCounterUpdate jce =
+ DAGEventCounterUpdate dagCounterEvent =
new DAGEventCounterUpdate(
ta.getDAGID()
);
- jce.addCounterUpdate(DAGCounter.TOTAL_LAUNCHED_TASKS, 1);
- return jce;
+ dagCounterEvent.addCounterUpdate(DAGCounter.TOTAL_LAUNCHED_TASKS, 1);
+ return dagCounterEvent;
}
- private static DAGEventCounterUpdate createJobCounterUpdateEventSlotMillis(
- TaskAttemptImpl ta) {
+ private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
+ TaskAttemptImpl taskAttempt, TaskAttemptState taState) {
DAGEventCounterUpdate jce =
- new DAGEventCounterUpdate(
- ta.getDAGID()
- );
+ new DAGEventCounterUpdate(taskAttempt.getDAGID());
-// long slotMillis = computeSlotMillis(ta);
-// jce.addCounterUpdate(DAGCounter.SLOTS_MILLIS_TASKS, slotMillis);
- return jce;
- }
-
- private static DAGEventCounterUpdate createJobCounterUpdateEventTATerminated(
- TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted,
- TaskAttemptStateInternal taState) {
- DAGEventCounterUpdate jce =
- new DAGEventCounterUpdate(
- taskAttempt.getDAGID());
-
- if (taState == TaskAttemptStateInternal.FAILED) {
+ if (taState == TaskAttemptState.FAILED) {
jce.addCounterUpdate(DAGCounter.NUM_FAILED_TASKS, 1);
- } else if (taState == TaskAttemptStateInternal.KILLED) {
+ } else if (taState == TaskAttemptState.KILLED) {
jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1);
+ } else if (taState == TaskAttemptState.SUCCEEDED ) {
+ jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1);
}
-// long slotMillisIncrement = computeSlotMillis(taskAttempt);
-// if (!taskAlreadyCompleted) {
-// // dont double count the elapsed time
-// jce.addCounterUpdate(DAGCounter.SLOTS_MILLIS_TASKS, slotMillisIncrement);
-// }
-
return jce;
}
@@ -1142,8 +1126,8 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
}
- ta.sendEvent(createJobCounterUpdateEventTATerminated(ta, false,
- helper.getTaskAttemptStateInternal()));
+ ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
+ helper.getTaskAttemptState()));
if (ta.getLaunchTime() != 0) {
// TODO For cases like this, recovery goes for a toss, since the the
// attempt will not exist in the history file.
@@ -1184,7 +1168,7 @@ public class TaskAttemptImpl implements TaskAttempt,
.createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
ta.trackerName = StringInterner.weakIntern(nodeHttpInetAddr.getHostName());
ta.httpPort = nodeHttpInetAddr.getPort();
- ta.sendEvent(createJobCounterUpdateEventTALaunched(ta));
+ ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta));
LOG.info("TaskAttempt: [" + ta.attemptId + "] started."
+ " Is using containerId: [" + ta.containerId + "]" + " on NM: ["
@@ -1316,7 +1300,8 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.setFinishTime();
// Send out history event.
ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
- ta.sendEvent(createJobCounterUpdateEventSlotMillis(ta));
+ ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
+ TaskAttemptState.SUCCEEDED));
// Inform the Scheduler.
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
@@ -1415,6 +1400,8 @@ public class TaskAttemptImpl implements TaskAttempt,
// TODO abort taskattempt
taskAttempt.sendEvent(new TaskEventTAUpdate(taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
+ taskAttempt.sendEvent(createDAGCounterUpdateEventTAFinished(taskAttempt,
+ getExternalState(TaskAttemptStateInternal.KILLED)));
endState = TaskAttemptStateInternal.KILLED;
break;
case SUCCEEDED:
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index c60ef58..9323270 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -25,7 +25,6 @@ import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
@@ -45,7 +44,7 @@ public class TaskFinishedEvent implements HistoryEvent {
private TezCounters tezCounters;
private TezTaskAttemptID successfulAttemptID;
private String diagnostics;
-
+
public TaskFinishedEvent(TezTaskID taskID,
String vertexName, long startTime, long finishTime,
TezTaskAttemptID successfulAttemptID,
@@ -85,9 +84,6 @@ public class TaskFinishedEvent implements HistoryEvent {
if (diagnostics != null) {
builder.setDiagnostics(diagnostics);
}
- if (tezCounters != null) {
- builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
- }
if (successfulAttemptID != null) {
builder.setSuccessfulTaskAttemptId(successfulAttemptID.toString());
}
@@ -101,10 +97,6 @@ public class TaskFinishedEvent implements HistoryEvent {
if (proto.hasDiagnostics()) {
this.diagnostics = proto.getDiagnostics();
}
- if (proto.hasCounters()) {
- this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
- proto.getCounters());
- }
if (proto.hasSuccessfulTaskAttemptId()) {
this.successfulAttemptID =
TezTaskAttemptID.fromString(proto.getSuccessfulTaskAttemptId());
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index df3551a..8057714 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -25,7 +25,6 @@ import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
@@ -99,9 +98,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
if (diagnostics != null) {
builder.setDiagnostics(diagnostics);
}
- if (tezCounters != null) {
- builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
- }
return builder.build();
}
@@ -113,10 +109,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
if (proto.hasDiagnostics()) {
this.diagnostics = proto.getDiagnostics();
}
- if (proto.hasCounters()) {
- this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
- proto.getCounters());
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 5cbe540..821612a 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -136,7 +136,6 @@ message VertexFinishedProto {
optional int64 finish_time = 3;
optional int32 state = 4;
optional string diagnostics = 5;
- optional TezCountersProto counters = 6;
}
message TaskStartedProto {
@@ -150,8 +149,7 @@ message TaskFinishedProto {
optional int64 finish_time = 2;
optional int32 state = 3;
optional string diagnostics = 4;
- optional TezCountersProto counters = 5;
- optional string successful_task_attempt_id = 6;
+ optional string successful_task_attempt_id = 5;
}
message TaskAttemptStartedProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 3633a85..8e134f2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -67,6 +67,7 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
@@ -423,6 +424,9 @@ public class TestTaskAttempt {
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+ verifyEventType(
+ arg.getAllValues().subList(expectedEventsAtRunning,
+ expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
"Terminated"));
@@ -571,6 +575,9 @@ public class TestTaskAttempt {
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+ verifyEventType(
+ arg.getAllValues().subList(expectedEventsAtRunning,
+ expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
"Terminated"));
@@ -658,6 +665,9 @@ public class TestTaskAttempt {
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEventsAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+ verifyEventType(
+ arg.getAllValues().subList(expectedEventsAtRunning,
+ expectedEventsAfterTerminating), DAGEventCounterUpdate.class, 1);
taImpl.handle(new TaskAttemptEvent(taskAttemptID,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM));
@@ -744,6 +754,9 @@ public class TestTaskAttempt {
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+ verifyEventType(
+ arg.getAllValues().subList(expectedEventsAtRunning,
+ expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
// Send out a Node Failure.
taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
@@ -838,6 +851,9 @@ public class TestTaskAttempt {
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+ verifyEventType(
+ arg.getAllValues().subList(expectedEventsAtRunning,
+ expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
// Send out a Node Failure.
taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 3b04cf6..a443a35 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -20,16 +20,17 @@ package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.counters.TezCounters;
@@ -39,6 +40,7 @@ import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -48,6 +50,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestTaskAttemptRecovery {
@@ -98,6 +101,17 @@ public class TestTaskAttemptRecovery {
assertEquals(state, recoveredState);
}
+ private void verifyEvents(List<Event> events, Class<? extends Event> eventClass,
+ int expectedTimes) {
+ int actualTimes = 0;
+ for (Event event : events) {
+ if (eventClass.isInstance(event)) {
+ actualTimes ++;
+ }
+ }
+ assertEquals(expectedTimes, actualTimes);
+ }
+
/**
* No any event to restore -> RecoverTransition
*/
@@ -105,7 +119,14 @@ public class TestTaskAttemptRecovery {
public void testTARecovery_NEW() {
ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
- verify(mockEventHandler, times(1)).handle(any(TaskEventTAUpdate.class));
+
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ assertEquals(2, events.size());
+ verifyEvents(events, TaskEventTAUpdate.class, 1);
+ // one for task killed
+ verifyEvents(events, DAGEventCounterUpdate.class, 1);
}
/**
@@ -117,7 +138,14 @@ public class TestTaskAttemptRecovery {
ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
- verify(mockEventHandler, times(1)).handle(any(TaskEventTAUpdate.class));
+
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(3)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ assertEquals(3, events.size());
+ verifyEvents(events, TaskEventTAUpdate.class, 1);
+ // one for task launch, one for task killed
+ verifyEvents(events, DAGEventCounterUpdate.class, 2);
}
/**
@@ -131,7 +159,13 @@ public class TestTaskAttemptRecovery {
ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
assertEquals(TaskAttemptStateInternal.SUCCEEDED, ta.getInternalState());
- verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ assertEquals(2, events.size());
+ // one for task launch, one for task succeeded
+ verifyEvents(events, DAGEventCounterUpdate.class, 2);
}
/**
@@ -145,7 +179,13 @@ public class TestTaskAttemptRecovery {
ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
- verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ assertEquals(2, events.size());
+ // one for task launch, one for task killed
+ verifyEvents(events, DAGEventCounterUpdate.class, 2);
}
/**
@@ -159,7 +199,13 @@ public class TestTaskAttemptRecovery {
ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
assertEquals(TaskAttemptStateInternal.FAILED, ta.getInternalState());
- verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ assertEquals(2, events.size());
+ // one for task launch, one for task killed
+ verifyEvents(events, DAGEventCounterUpdate.class, 2);
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 72d0c25..5bb7d35 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -354,7 +354,6 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getFinishTime());
Assert.assertEquals(event.getState(), deserializedEvent.getState());
Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
- Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
logEvents(event, deserializedEvent);
}
{
@@ -370,7 +369,6 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getFinishTime());
Assert.assertEquals(event.getState(), deserializedEvent.getState());
Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
- Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
logEvents(event, deserializedEvent);
}
}
@@ -403,8 +401,6 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getFinishTime());
Assert.assertEquals(event.getState(),
deserializedEvent.getState());
- Assert.assertEquals(event.getTezCounters(),
- deserializedEvent.getTezCounters());
Assert.assertEquals(event.getSuccessfulAttemptID(),
deserializedEvent.getSuccessfulAttemptID());
Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
@@ -425,8 +421,6 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getFinishTime());
Assert.assertEquals(event.getState(),
deserializedEvent.getState());
- Assert.assertEquals(event.getTezCounters(),
- deserializedEvent.getTezCounters());
Assert.assertEquals(event.getSuccessfulAttemptID(),
deserializedEvent.getSuccessfulAttemptID());
Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index eae38f0..bd2fe99 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -38,6 +39,8 @@ import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
@@ -193,7 +196,12 @@ public class TestAMRecovery {
DAG dag =
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.BROADCAST, true);
- runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+ assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+ assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+ assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
+
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -220,7 +228,11 @@ public class TestAMRecovery {
DAG dag =
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.BROADCAST, false);
- runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+ assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+ assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+ assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -244,11 +256,15 @@ public class TestAMRecovery {
* @throws Exception
*/
@Test(timeout = 120000)
- public void testVertexPartialComplete_One2One() throws Exception {
+ public void testVertexPartialFinished_One2One() throws Exception {
DAG dag =
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.ONE_TO_ONE, true);
- runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+ assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+ assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+ assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -273,11 +289,15 @@ public class TestAMRecovery {
* @throws Exception
*/
@Test(timeout = 120000)
- public void testVertexCompletelyComplete_One2One() throws Exception {
+ public void testVertexCompletelyFinished_One2One() throws Exception {
DAG dag =
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.ONE_TO_ONE, false);
- runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+ assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+ assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+ assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -306,7 +326,11 @@ public class TestAMRecovery {
DAG dag =
createDAG(ControlledShuffleVertexManager.class,
DataMovementType.SCATTER_GATHER, true);
- runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+ assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+ assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+ assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -335,7 +359,11 @@ public class TestAMRecovery {
DAG dag =
createDAG(ControlledShuffleVertexManager.class,
DataMovementType.SCATTER_GATHER, false);
- runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+ assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+ assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+ assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -369,13 +397,14 @@ public class TestAMRecovery {
}
- void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+ TezCounters runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
tezSession.waitTillReady();
DAGClient dagClient = tezSession.submitDAG(dag);
DAGStatus dagStatus =
dagClient.waitForCompletionWithStatusUpdates(EnumSet
.of(StatusGetOpts.GET_COUNTERS));
Assert.assertEquals(finalState, dagStatus.getState());
+ return dagStatus.getDAGCounters();
}
/**
@@ -479,6 +508,19 @@ public class TestAMRecovery {
}
}
}
+
+ @Override
+ public void onVertexStarted(Map<String, List<Integer>> completions) {
+ // sleep for 1 seconds to delay the running of task in v2.
+ // this could keep the case that task of v1 is partial finished or completely
+ // finished, and at the same time the task of v2 is not started
+ try {
+ Thread.sleep(1*1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ super.onVertexStarted(completions);
+ }
}
public static class ControlledShuffleVertexManager extends
@@ -516,6 +558,19 @@ public class TestAMRecovery {
}
}
}
+
+ @Override
+ public void onVertexStarted(Map<String, List<Integer>> completions) {
+ // sleep for 1 seconds to delay the running of task in v2.
+ // this could keep the case that task of v1 is partial finished or completely
+ // finished, and at the same time the task of v2 is not started
+ try {
+ Thread.sleep(1*1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ super.onVertexStarted(completions);
+ }
}
public static class ControlledImmediateStartVertexManager extends
@@ -554,6 +609,19 @@ public class TestAMRecovery {
}
}
}
+
+ @Override
+ public void onVertexStarted(Map<String, List<Integer>> completions) {
+ // sleep for 1 seconds to delay the running of task in v2.
+ // this could keep the case that task of v1 is partial finished or completely
+ // finished, and at the same time the task of v2 is not started
+ try {
+ Thread.sleep(1*1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ super.onVertexStarted(completions);
+ }
}
/**
@@ -592,6 +660,10 @@ public class TestAMRecovery {
}
}
+ public static enum TestCounter {
+ Counter_1,
+ }
+
/**
* Do nothing if it is in task 0, sleep 3 seconds for other tasks. This enable
* us to kill AM in VM when some tasks are still running.
@@ -605,8 +677,10 @@ public class TestAMRecovery {
@Override
public void run() throws Exception {
+ getContext().getCounters().findCounter(TestCounter.Counter_1).increment(1);
if (getContext().getTaskIndex() == 0) {
- return;
+ // keep task_0 running for 1 seconds to wait for task_1 start running
+ Thread.sleep(1 * 1000);;
} else {
Thread.sleep(3 * 1000);
}