You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2014/01/17 22:38:37 UTC

svn commit: r1559256 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/...

Author: jlowe
Date: Fri Jan 17 21:38:37 2014
New Revision: 1559256

URL: http://svn.apache.org/r1559256
Log:
MAPREDUCE-5717. Task pings are interpreted as task progress. Contributed by Jason Lowe

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1559256&r1=1559255&r2=1559256&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Jan 17 21:38:37 2014
@@ -146,6 +146,8 @@ Trunk (Unreleased)
     MAPREDUCE-5191. TestQueue#testQueue fails with timeout on Windows. (Ivan
     Mitic via hitesh)
 
+    MAPREDUCE-5717. Task pings are interpreted as task progress (jlowe)
+
 Release 2.4.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1559256&r1=1559255&r2=1559256&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Jan 17 21:38:37 2014
@@ -361,7 +361,6 @@ public class TaskAttemptListenerImpl ext
     if (taskStatus == null) {
       //We are using statusUpdate only as a simple ping
       LOG.info("Ping from " + taskAttemptID.toString());
-      taskHeartbeatHandler.progressing(yarnAttemptID);
       return feedback;
     }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1559256&r1=1559255&r2=1559256&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Jan 17 21:38:37 2014
@@ -381,4 +381,50 @@ public class TestTaskAttemptListenerImpl
 
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testStatusUpdateProgress()
+      throws IOException, InterruptedException {
+    AppContext appCtx = mock(AppContext.class);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
+    TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    MockTaskAttemptListenerImpl listener =
+      new MockTaskAttemptListenerImpl(appCtx, secret,
+          rmHeartbeatHandler, hbHandler, policy);
+    Configuration conf = new Configuration();
+    listener.init(conf);
+    listener.start();
+    JVMId id = new JVMId("foo",1, true, 1);
+    WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
+
+    TaskAttemptID attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
+    TaskAttemptId attemptId = TypeConverter.toYarn(attemptID);
+    Task task = mock(Task.class);
+    listener.registerPendingTask(task, wid);
+    listener.registerLaunchedTask(attemptId, wid);
+    verify(hbHandler).register(attemptId);
+
+    // make sure a ping doesn't report progress
+    AMFeedback feedback = listener.statusUpdate(attemptID, null);
+    assertTrue(feedback.getTaskFound());
+    verify(hbHandler, never()).progressing(eq(attemptId));
+
+    // make sure a status update does report progress
+    MapTaskStatus mockStatus = new MapTaskStatus(attemptID, 0.0f, 1,
+        TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.MAP,
+        new Counters());
+    feedback = listener.statusUpdate(attemptID, mockStatus);
+    assertTrue(feedback.getTaskFound());
+    verify(hbHandler).progressing(eq(attemptId));
+    listener.close();
+  }
 }