You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2017/10/02 22:13:53 UTC
tez git commit: TEZ-3847. AM web controller task counters are empty
sometimes. Contributed by Jonathan Eagles
Repository: tez
Updated Branches:
refs/heads/master 4d6690ae1 -> 3b2933f01
TEZ-3847. AM web controller task counters are empty sometimes. Contributed by Jonathan Eagles
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b2933f0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b2933f0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b2933f0
Branch: refs/heads/master
Commit: 3b2933f01d9cf3a431f8230d4a3a7f83e7099788
Parents: 4d6690a
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Oct 2 17:12:52 2017 -0500
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Oct 2 17:12:52 2017 -0500
----------------------------------------------------------------------
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 8 ++-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 66 ++++++++++++++++++++
2 files changed, 72 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3b2933f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1fe65a9..1218543 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1575,8 +1575,12 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskStatusUpdateEvent statusEvent = sEvent.getStatusEvent();
ta.reportedStatus.state = ta.getState();
ta.reportedStatus.progress = statusEvent.getProgress();
- ta.reportedStatus.counters = statusEvent.getCounters();
- ta.statistics = statusEvent.getStatistics();
+ if (statusEvent.getCounters() != null) {
+ ta.reportedStatus.counters = statusEvent.getCounters();
+ }
+ if (statusEvent.getStatistics() != null) {
+ ta.statistics = statusEvent.getStatistics();
+ }
if (statusEvent.getProgressNotified()) {
ta.lastNotifyProgressTimestamp = ta.clock.getTime();
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/3b2933f0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 7709bc0..2bad2ef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
@@ -1072,6 +1073,71 @@ public class TestTaskAttempt {
Assert.assertTrue("This should have been an attempt failed event!", arg.getValue() instanceof TaskAttemptEventAttemptFailed);
}
+ @Test
+ public void testStatusUpdateWithNullCounters() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+ appId, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+ MockEventHandler eventHandler = spy(new MockEventHandler());
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+ Configuration taskConf = new Configuration();
+ taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+ locationHint = TaskLocationHint.createTaskLocationHint(
+ new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+ Resource resource = Resource.newInstance(1024, 1);
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+ new ContainerContextMatcher(), appCtx);
+ containers.addContainerIfNew(container, 0, 0, 0);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+
+ TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+ TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+ taListener, taskConf, new SystemClock(),
+ mockHeartbeatHandler, appCtx, false,
+ resource, createFakeContainerContext(), false);
+ TezTaskAttemptID taskAttemptID = taImpl.getID();
+
+ taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+ taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+ taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
+ assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING);
+ verify(mockHeartbeatHandler).register(taskAttemptID);
+
+ TezCounters counters = new TezCounters();
+ counters.findCounter("group", "counter").increment(1);
+ taImpl.handle(new TaskAttemptEventStatusUpdate(
+ taskAttemptID, new TaskStatusUpdateEvent(counters, 0.1f, null, false)));
+ assertEquals(1, taImpl.getCounters().findCounter("group", "counter").getValue());
+ taImpl.handle(new TaskAttemptEventStatusUpdate(
+ taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+ assertEquals(1, taImpl.getCounters().findCounter("group", "counter").getValue());
+ counters.findCounter("group", "counter").increment(1);
+ taImpl.handle(new TaskAttemptEventStatusUpdate(
+ taskAttemptID, new TaskStatusUpdateEvent(counters, 0.1f, null, false)));
+ assertEquals(2, taImpl.getCounters().findCounter("group", "counter").getValue());
+ taImpl.handle(new TaskAttemptEventStatusUpdate(
+ taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+ assertEquals(2, taImpl.getCounters().findCounter("group", "counter").getValue());
+ }
+
@Test (timeout = 5000)
public void testNoProgressFail() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);