You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/03 01:34:18 UTC
tez git commit: TEZ-2752. logUnsuccessful completion in Attempt
should write original finish time to ATS (bikas)
Repository: tez
Updated Branches:
refs/heads/master eeac7a92c -> 996906d27
TEZ-2752. logUnsuccessful completion in Attempt should write original finish time to ATS (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/996906d2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/996906d2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/996906d2
Branch: refs/heads/master
Commit: 996906d27cdb0a1c4301dc449aa5dc638b5b4363
Parents: eeac7a9
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Sep 2 16:34:05 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Sep 2 16:34:05 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 8 ++++
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 6 ++-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 42 +++++++++++++-------
3 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/996906d2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a6d3b54..687db4d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2752. logUnsuccessful completion in Attempt should write original finish
+ time to ATS
TEZ-2755. Fix findbugs warning in TezClient
TEZ-2767. Make TezMxBeanResourceCalculator the default resource calculator.
TEZ-2765. Change Xmlwriter to use defaultValue instead of value tag.
@@ -160,6 +162,8 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2752. logUnsuccessful completion in Attempt should write original finish
+ time to ATS
TEZ-2755. Fix findbugs warning in TezClient
TEZ-2767. Make TezMxBeanResourceCalculator the default resource calculator.
TEZ-2602. Throwing EOFException when launching MR job
@@ -395,6 +399,8 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2752. logUnsuccessful completion in Attempt should write original finish
+ time to ATS
TEZ-2742. VertexImpl.finished() terminationCause hides member var of the
same name
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
@@ -609,6 +615,8 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2752. logUnsuccessful completion in Attempt should write original finish
+ time to ATS
TEZ-2742. VertexImpl.finished() terminationCause hides member var of the
same name
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
http://git-wip-us.apache.org/repos/asf/tez/blob/996906d2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 3f2e3a4..e57c827 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1048,9 +1048,13 @@ public class TaskAttemptImpl implements TaskAttempt,
protected void logJobHistoryAttemptUnsuccesfulCompletion(
TaskAttemptState state) {
+ long finishTime = getFinishTime();
+ if (finishTime <= 0) {
+ finishTime = clock.getTime(); // comes here in case it was terminated before launch
+ }
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getVertex().getName(), getLaunchTime(),
- clock.getTime(), state,
+ finishTime, state,
terminationCause,
StringUtils.join(
getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEventTime,
http://git-wip-us.apache.org/repos/asf/tez/blob/996906d2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 13c9202..101b22f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -91,6 +91,9 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -117,6 +120,7 @@ public class TestTaskAttempt {
}
}
+ AppContext appCtx;
Task mockTask;
TaskLocationHint locationHint;
@@ -127,7 +131,10 @@ public class TestTaskAttempt {
@Before
public void setupTest() {
+ appCtx = mock(AppContext.class);
mockTask = mock(Task.class);
+ HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
+ doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
}
@Test(timeout = 5000)
@@ -146,7 +153,7 @@ public class TestTaskAttempt {
TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
- mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+ mock(TaskHeartbeatHandler.class), appCtx,
false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
@@ -181,12 +188,12 @@ public class TestTaskAttempt {
TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
- mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+ mock(TaskHeartbeatHandler.class), appCtx,
false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
- mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+ mock(TaskHeartbeatHandler.class), appCtx,
true, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -245,7 +252,7 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskCommunicatorManagerInterface.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
- mock(AppContext.class), false, Resource.newInstance(1024,
+ appCtx, false, Resource.newInstance(1024,
1), createFakeContainerContext(), false);
TaskAttemptImpl spyTa = spy(taImpl);
@@ -295,7 +302,7 @@ public class TestTaskAttempt {
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
- AppContext mockAppContext = mock(AppContext.class);
+ AppContext mockAppContext = appCtx;
doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo();
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
@@ -352,7 +359,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
@@ -452,7 +458,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
@@ -516,7 +521,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
@@ -608,7 +612,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
@@ -738,7 +741,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
@@ -828,7 +830,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
@@ -922,7 +923,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
@@ -1024,7 +1024,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
@@ -1123,7 +1122,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
@@ -1131,6 +1129,8 @@ public class TestTaskAttempt {
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
+ HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
+ doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
@@ -1152,7 +1152,12 @@ public class TestTaskAttempt {
int expectedEventsTillSucceeded = 6;
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ ArgumentCaptor<DAGHistoryEvent> histArg = ArgumentCaptor.forClass(DAGHistoryEvent.class);
verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture());
+ verify(mockHistHandler, times(2)).handle(histArg.capture()); // start and finish
+ DAGHistoryEvent histEvent = histArg.getValue();
+ TaskAttemptFinishedEvent finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
+ long finishTime = finishEvent.getFinishTime();
verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2);
InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 1);
@@ -1183,6 +1188,11 @@ public class TestTaskAttempt {
assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl.getTerminationCause());
// verify unregister is not invoked again
verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);
+ verify(mockHistHandler, times(3)).handle(histArg.capture());
+ histEvent = histArg.getValue();
+ finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
+ long newFinishTime = finishEvent.getFinishTime();
+ Assert.assertEquals(finishTime, newFinishTime);
assertEquals(true, taImpl.inputFailedReported);
int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
@@ -1266,7 +1276,6 @@ public class TestTaskAttempt {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
@@ -1353,18 +1362,21 @@ public class TestTaskAttempt {
@Override
protected void logJobHistoryAttemptStarted() {
taskAttemptStartedEventLogged++;
+ super.logJobHistoryAttemptStarted();
}
@Override
protected void logJobHistoryAttemptFinishedEvent(
TaskAttemptStateInternal state) {
taskAttemptFinishedEventLogged++;
+ super.logJobHistoryAttemptFinishedEvent(state);
}
@Override
protected void logJobHistoryAttemptUnsuccesfulCompletion(
TaskAttemptState state) {
taskAttemptFinishedEventLogged++;
+ super.logJobHistoryAttemptUnsuccesfulCompletion(state);
}
@Override