You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2014/01/17 22:38:37 UTC
svn commit: r1559256 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/...
Author: jlowe
Date: Fri Jan 17 21:38:37 2014
New Revision: 1559256
URL: http://svn.apache.org/r1559256
Log:
MAPREDUCE-5717. Task pings are interpreted as task progress. Contributed by Jason Lowe
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1559256&r1=1559255&r2=1559256&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Jan 17 21:38:37 2014
@@ -146,6 +146,8 @@ Trunk (Unreleased)
MAPREDUCE-5191. TestQueue#testQueue fails with timeout on Windows. (Ivan
Mitic via hitesh)
+ MAPREDUCE-5717. Task pings are interpreted as task progress (jlowe)
+
Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1559256&r1=1559255&r2=1559256&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Jan 17 21:38:37 2014
@@ -361,7 +361,6 @@ public class TaskAttemptListenerImpl ext
if (taskStatus == null) {
//We are using statusUpdate only as a simple ping
LOG.info("Ping from " + taskAttemptID.toString());
- taskHeartbeatHandler.progressing(yarnAttemptID);
return feedback;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1559256&r1=1559255&r2=1559256&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Jan 17 21:38:37 2014
@@ -381,4 +381,50 @@ public class TestTaskAttemptListenerImpl
}
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testStatusUpdateProgress()
+ throws IOException, InterruptedException {
+ AppContext appCtx = mock(AppContext.class);
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
+ TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ EventHandler ea = mock(EventHandler.class);
+ when(dispatcher.getEventHandler()).thenReturn(ea);
+
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
+ MockTaskAttemptListenerImpl listener =
+ new MockTaskAttemptListenerImpl(appCtx, secret,
+ rmHeartbeatHandler, hbHandler, policy);
+ Configuration conf = new Configuration();
+ listener.init(conf);
+ listener.start();
+ JVMId id = new JVMId("foo",1, true, 1);
+ WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
+
+ TaskAttemptID attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
+ TaskAttemptId attemptId = TypeConverter.toYarn(attemptID);
+ Task task = mock(Task.class);
+ listener.registerPendingTask(task, wid);
+ listener.registerLaunchedTask(attemptId, wid);
+ verify(hbHandler).register(attemptId);
+
+ // make sure a ping doesn't report progress
+ AMFeedback feedback = listener.statusUpdate(attemptID, null);
+ assertTrue(feedback.getTaskFound());
+ verify(hbHandler, never()).progressing(eq(attemptId));
+
+ // make sure a status update does report progress
+ MapTaskStatus mockStatus = new MapTaskStatus(attemptID, 0.0f, 1,
+ TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.MAP,
+ new Counters());
+ feedback = listener.statusUpdate(attemptID, mockStatus);
+ assertTrue(feedback.getTaskFound());
+ verify(hbHandler).progressing(eq(attemptId));
+ listener.close();
+ }
}