You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2017/10/02 22:13:53 UTC

tez git commit: TEZ-3847. AM web controller task counters are empty sometimes. Contributed by Jonathan Eagles

Repository: tez
Updated Branches:
  refs/heads/master 4d6690ae1 -> 3b2933f01


TEZ-3847. AM web controller task counters are empty sometimes. Contributed by Jonathan Eagles


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b2933f0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b2933f0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b2933f0

Branch: refs/heads/master
Commit: 3b2933f01d9cf3a431f8230d4a3a7f83e7099788
Parents: 4d6690a
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Oct 2 17:12:52 2017 -0500
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Oct 2 17:12:52 2017 -0500

----------------------------------------------------------------------
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  8 ++-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 66 ++++++++++++++++++++
 2 files changed, 72 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3b2933f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1fe65a9..1218543 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1575,8 +1575,12 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskStatusUpdateEvent statusEvent = sEvent.getStatusEvent();
       ta.reportedStatus.state = ta.getState();
       ta.reportedStatus.progress = statusEvent.getProgress();
-      ta.reportedStatus.counters = statusEvent.getCounters();
-      ta.statistics = statusEvent.getStatistics();
+      if (statusEvent.getCounters() != null) {
+        ta.reportedStatus.counters = statusEvent.getCounters();
+      }
+      if (statusEvent.getStatistics() != null) {
+        ta.statistics = statusEvent.getStatistics();
+      }
       if (statusEvent.getProgressNotified()) {
         ta.lastNotifyProgressTimestamp = ta.clock.getTime();
       } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/3b2933f0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 7709bc0..2bad2ef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
 import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
@@ -1072,6 +1073,71 @@ public class TestTaskAttempt {
     Assert.assertTrue("This should have been an attempt failed event!", arg.getValue() instanceof TaskAttemptEventAttemptFailed);
   }
 
+  @Test
+  public void testStatusUpdateWithNullCounters() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container, 0, 0, 0);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID = taImpl.getID();
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING);
+    verify(mockHeartbeatHandler).register(taskAttemptID);
+
+    TezCounters counters = new TezCounters();
+    counters.findCounter("group", "counter").increment(1);
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(counters, 0.1f, null, false)));
+    assertEquals(1, taImpl.getCounters().findCounter("group", "counter").getValue());
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+    assertEquals(1, taImpl.getCounters().findCounter("group", "counter").getValue());
+    counters.findCounter("group", "counter").increment(1);
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(counters, 0.1f, null, false)));
+    assertEquals(2, taImpl.getCounters().findCounter("group", "counter").getValue());
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+    assertEquals(2, taImpl.getCounters().findCounter("group", "counter").getValue());
+  }
+
   @Test (timeout = 5000)
   public void testNoProgressFail() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);