You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2013/07/27 05:44:19 UTC
svn commit: r1507569 - in /hadoop/common/branches/branch-1: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: tucu
Date: Sat Jul 27 03:44:18 2013
New Revision: 1507569
URL: http://svn.apache.org/r1507569
Log:
MAPREDUCE-4366. mapred metrics shows negative count of waiting maps and reduces. (sandyr via tucu)
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sat Jul 27 03:44:18 2013
@@ -105,6 +105,9 @@ Release 1.3.0 - unreleased
HADOOP-9507. LocalFileSystem rename() is broken in some cases when
destination exists. (cnauroth)
+ MAPREDUCE-4366. mapred metrics shows negative count of waiting maps and
+ reduces. (sandyr via tucu)
+
Release 1.2.1 - 2013.07.06
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Sat Jul 27 03:44:18 2013
@@ -115,6 +115,9 @@ public class JobInProgress {
// runningMapTasks include speculative tasks, so we need to capture
// speculative tasks separately
+ // if a task is incomplete, running attempts over one per task are counted
+ // in these variables. if a task is complete, all its running attempts are
+ // included
int speculativeMapTasks = 0;
int speculativeReduceTasks = 0;
@@ -1738,6 +1741,7 @@ public class JobInProgress {
String name;
String splits = "";
Enum<Counter> counter = null;
+ boolean speculative = tip.getActiveTasks().size() > 1;
if (tip.isJobSetupTask()) {
launchedSetup = true;
name = Values.SETUP.name();
@@ -1749,18 +1753,18 @@ public class JobInProgress {
name = Values.MAP.name();
counter = Counter.TOTAL_LAUNCHED_MAPS;
splits = tip.getSplitNodes();
- if (tip.getActiveTasks().size() > 1)
+ if (speculative)
speculativeMapTasks++;
- metrics.launchMap(id);
- this.queueMetrics.launchMap(id);
+ metrics.launchMap(id, speculative);
+ this.queueMetrics.launchMap(id, speculative);
} else {
++runningReduceTasks;
name = Values.REDUCE.name();
counter = Counter.TOTAL_LAUNCHED_REDUCES;
- if (tip.getActiveTasks().size() > 1)
+ if (speculative)
speculativeReduceTasks++;
- metrics.launchReduce(id);
- this.queueMetrics.launchReduce(id);
+ metrics.launchReduce(id, speculative);
+ this.queueMetrics.launchReduce(id, speculative);
}
// Note that the logs are for the scheduled tasks only. Tasks that join on
// restart has already their logs in place.
@@ -2692,10 +2696,6 @@ public class JobInProgress {
jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
} else if (tip.isMapTask()) {
runningMapTasks -= 1;
- // check if this was a sepculative task
- if (oldNumAttempts > 1) {
- speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
- }
finishedMapTasks += 1;
metrics.completeMap(taskid);
this.queueMetrics.completeMap(taskid);
@@ -2709,9 +2709,6 @@ public class JobInProgress {
}
} else {
runningReduceTasks -= 1;
- if (oldNumAttempts > 1) {
- speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
- }
finishedReduceTasks += 1;
metrics.completeReduce(taskid);
this.queueMetrics.completeReduce(taskid);
@@ -3003,7 +3000,7 @@ public class JobInProgress {
tip.incompleteSubTask(taskid, this.status);
boolean isRunning = tip.isRunning();
- boolean isComplete = tip.isComplete();
+ boolean tipIsComplete = tip.isComplete();
boolean metricsDone = isComplete(); // job metrics garbage collected
if (wasAttemptRunning) {
@@ -3018,14 +3015,24 @@ public class JobInProgress {
// hence we are decrementing the same set.
// Except after garbageCollect in a different thread.
if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
+ boolean incWaiting = !tipIsComplete && !metricsDone &&
+ tip.getActiveTasks().isEmpty();
+ boolean wasSpeculative = wasComplete || !tip.getActiveTasks().isEmpty();
+
if (tip.isMapTask() && !metricsDone) {
runningMapTasks -= 1;
- metrics.failedMap(taskid);
- this.queueMetrics.failedMap(taskid);
+ metrics.failedMap(taskid, incWaiting);
+ this.queueMetrics.failedMap(taskid, incWaiting);
+ if (wasSpeculative) {
+ speculativeMapTasks--;
+ }
} else if (!metricsDone) {
runningReduceTasks -= 1;
- metrics.failedReduce(taskid);
- this.queueMetrics.failedReduce(taskid);
+ metrics.failedReduce(taskid, incWaiting);
+ this.queueMetrics.failedReduce(taskid, incWaiting);
+ if (wasSpeculative) {
+ speculativeReduceTasks--;
+ }
}
}
@@ -3042,14 +3049,14 @@ public class JobInProgress {
} else if (tip.isMapTask()) {
// remove from the running queue and put it in the non-running cache
// if the tip is not complete i.e if the tip still needs to be run
- if (!isComplete) {
+ if (!tipIsComplete) {
retireMap(tip);
failMap(tip);
}
} else {
// remove from the running queue and put in the failed queue if the tip
// is not complete
- if (!isComplete) {
+ if (!tipIsComplete) {
retireReduce(tip);
failReduce(tip);
}
@@ -3058,7 +3065,7 @@ public class JobInProgress {
// The case when the map was complete but the task tracker went down.
// However, we don't need to do any metering here...
- if (wasComplete && !isComplete) {
+ if (wasComplete && !tipIsComplete) {
if (tip.isMapTask()) {
// Put the task back in the cache. This will help locality for cases
// where we have a different TaskTracker from the same rack/switch
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Sat Jul 27 03:44:18 2013
@@ -32,22 +32,22 @@ class JobTrackerInstrumentation {
tracker = jt;
}
- public void launchMap(TaskAttemptID taskAttemptID)
+ public void launchMap(TaskAttemptID taskAttemptID, boolean speculative)
{ }
public void completeMap(TaskAttemptID taskAttemptID)
{ }
- public void failedMap(TaskAttemptID taskAttemptID)
+ public void failedMap(TaskAttemptID taskAttemptID, boolean taskNowWaiting)
{ }
- public void launchReduce(TaskAttemptID taskAttemptID)
+ public void launchReduce(TaskAttemptID taskAttemptID, boolean speculative)
{ }
public void completeReduce(TaskAttemptID taskAttemptID)
{ }
- public void failedReduce(TaskAttemptID taskAttemptID)
+ public void failedReduce(TaskAttemptID taskAttemptID, boolean taskNowWaiting)
{ }
public void submitJob(JobConf conf, JobID id)
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java Sat Jul 27 03:44:18 2013
@@ -111,9 +111,11 @@ class JobTrackerMetricsSource extends Jo
}
@Override
- public void launchMap(TaskAttemptID taskAttemptID) {
+ public void launchMap(TaskAttemptID taskAttemptID, boolean speculative) {
mapsLaunched.incr();
- decWaitingMaps(taskAttemptID.getJobID(), 1);
+ if (!speculative) {
+ decWaitingMaps(taskAttemptID.getJobID(), 1);
+ }
}
@Override
@@ -122,15 +124,19 @@ class JobTrackerMetricsSource extends Jo
}
@Override
- public void failedMap(TaskAttemptID taskAttemptID) {
+ public void failedMap(TaskAttemptID taskAttemptID, boolean incWaiting) {
mapsFailed.incr();
- addWaitingMaps(taskAttemptID.getJobID(), 1);
+ if (incWaiting) {
+ addWaitingMaps(taskAttemptID.getJobID(), 1);
+ }
}
@Override
- public void launchReduce(TaskAttemptID taskAttemptID) {
+ public void launchReduce(TaskAttemptID taskAttemptID, boolean speculative) {
redsLaunched.incr();
- decWaitingReduces(taskAttemptID.getJobID(), 1);
+ if (!speculative) {
+ decWaitingReduces(taskAttemptID.getJobID(), 1);
+ }
}
@Override
@@ -139,9 +145,11 @@ class JobTrackerMetricsSource extends Jo
}
@Override
- public void failedReduce(TaskAttemptID taskAttemptID) {
+ public void failedReduce(TaskAttemptID taskAttemptID, boolean incWaiting) {
redsFailed.incr();
- addWaitingReduces(taskAttemptID.getJobID(), 1);
+ if (incWaiting) {
+ addWaitingReduces(taskAttemptID.getJobID(), 1);
+ }
}
@Override
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Sat Jul 27 03:44:18 2013
@@ -219,7 +219,7 @@ public class LocalJobRunner implements J
map.setConf(localConf);
try {
map_tasks.getAndIncrement();
- myMetrics.launchMap(mapId);
+ myMetrics.launchMap(mapId, false);
map.run(localConf, Job.this);
myMetrics.completeMap(mapId);
} finally {
@@ -393,8 +393,8 @@ public class LocalJobRunner implements J
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
reduce_tasks += 1;
- myMetrics.launchReduce(reduce.getTaskID());
- queueMetrics.launchReduce(reduce.getTaskID());
+ myMetrics.launchReduce(reduce.getTaskID(), false);
+ queueMetrics.launchReduce(reduce.getTaskID(), false);
reduce.run(localConf, this);
myMetrics.completeReduce(reduce.getTaskID());
queueMetrics.completeReduce(reduce.getTaskID());
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java Sat Jul 27 03:44:18 2013
@@ -136,32 +136,40 @@ class QueueMetrics implements MetricsSou
registry.snapshot(builder.addRecord(registry.name()), all);
}
- public void launchMap(TaskAttemptID taskAttemptID) {
+ public void launchMap(TaskAttemptID taskAttemptID, boolean speculative) {
mapsLaunched.incr();
- decWaitingMaps(taskAttemptID.getJobID(), 1);
+ if (!speculative) {
+ decWaitingMaps(taskAttemptID.getJobID(), 1);
+ }
}
public void completeMap(TaskAttemptID taskAttemptID) {
mapsCompleted.incr();
}
- public void failedMap(TaskAttemptID taskAttemptID) {
+ public void failedMap(TaskAttemptID taskAttemptID, boolean incWaiting) {
mapsFailed.incr();
- addWaitingMaps(taskAttemptID.getJobID(), 1);
+ if (incWaiting) {
+ addWaitingMaps(taskAttemptID.getJobID(), 1);
+ }
}
- public void launchReduce(TaskAttemptID taskAttemptID) {
+ public void launchReduce(TaskAttemptID taskAttemptID, boolean speculative) {
redsLaunched.incr();
- decWaitingReduces(taskAttemptID.getJobID(), 1);
+ if (!speculative) {
+ decWaitingReduces(taskAttemptID.getJobID(), 1);
+ }
}
public void completeReduce(TaskAttemptID taskAttemptID) {
redsCompleted.incr();
}
- public void failedReduce(TaskAttemptID taskAttemptID) {
+ public void failedReduce(TaskAttemptID taskAttemptID, boolean incWaiting) {
redsFailed.incr();
- addWaitingReduces(taskAttemptID.getJobID(), 1);
+ if (incWaiting) {
+ addWaitingReduces(taskAttemptID.getJobID(), 1);
+ }
}
public void submitJob(JobConf conf, JobID id) {
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java Sat Jul 27 03:44:18 2013
@@ -54,21 +54,21 @@ public class TestQueueMetrics extends Te
QueueMetrics metrics = QueueMetrics.create(queueName, new Configuration());
assertEquals(metrics.getQueueName(), "single");
- metrics.launchMap(taskAttemptID);
+ metrics.launchMap(taskAttemptID, false);
checkMaps(metrics, 1, 0, 0, 0, -1, 0);
metrics.addWaitingMaps(taskAttemptID.getJobID(), 5);
- metrics.launchMap(taskAttemptID);
+ metrics.launchMap(taskAttemptID, false);
checkMaps(metrics, 2, 0, 0, 0, 3, 0);
checkReduces(metrics, 0, 0, 0, 0, 0, 0);
metrics.completeMap(taskAttemptID);
- metrics.failedMap(taskAttemptID);
+ metrics.failedMap(taskAttemptID, true);
checkMaps(metrics, 2, 1, 1, 0, 4, 0);
checkReduces(metrics, 0, 0, 0, 0, 0, 0);
- metrics.launchReduce(taskAttemptID);
+ metrics.launchReduce(taskAttemptID, false);
metrics.completeReduce(taskAttemptID);
- metrics.failedReduce(taskAttemptID);
+ metrics.failedReduce(taskAttemptID, true);
checkMaps(metrics, 2, 1, 1, 0, 4, 0);
checkReduces(metrics, 1, 1, 1, 0, 0, 0);