You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/06/06 06:19:14 UTC
svn commit: r951812 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/JobInProgress.java
src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
Author: cdouglas
Date: Sun Jun 6 04:19:13 2010
New Revision: 951812
URL: http://svn.apache.org/viewvc?rev=951812&view=rev
Log:
MAPREDUCE-1545. Add timestamps for first task type launched in job summary. Contributed by Luke Lu
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=951812&r1=951811&r2=951812&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Jun 6 04:19:13 2010
@@ -39,6 +39,9 @@ Trunk (unreleased changes)
then streaming does not throw error. (Amareshwari Sriramadasu via
vinodkv)
+ MAPREDUCE-1545. Add timestamps for first task type launched in job summary.
+ (Luke Lu via cdouglas)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=951812&r1=951811&r2=951812&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sun Jun 6 04:19:13 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
@@ -225,6 +226,10 @@ public class JobInProgress {
long startTime;
long launchTime;
long finishTime;
+
+ // First *task launch times
+ final Map<TaskType, Long> firstTaskLaunchTimes =
+ new EnumMap<TaskType, Long>(TaskType.class);
// Indicates how many times the job got restarted
private final int restartCount;
@@ -842,6 +847,9 @@ public class JobInProgress {
public synchronized long getLaunchTime() {
return launchTime;
}
+ Map<TaskType, Long> getFirstTaskLaunchTimes() {
+ return firstTaskLaunchTimes;
+ }
public long getStartTime() {
return startTime;
}
@@ -1649,7 +1657,7 @@ public class JobInProgress {
name, splits);
jobHistory.logEvent(tse, tip.getJob().jobId);
-
+ setFirstTaskLaunchTime(tip);
}
if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
jobCounters.incrCounter(counter, 1);
@@ -1688,6 +1696,17 @@ public class JobInProgress {
}
}
}
+
+ void setFirstTaskLaunchTime(TaskInProgress tip) {
+ TaskType key = getTaskType(tip);
+
+ synchronized(firstTaskLaunchTimes) {
+ // Could be optimized to do only one lookup with a little more code
+ if (!firstTaskLaunchTimes.containsKey(key)) {
+ firstTaskLaunchTimes.put(key, tip.getExecStartTime());
+ }
+ }
+ }
public static String convertTrackerNameToHostName(String trackerName) {
// Ugly!
@@ -3495,7 +3514,50 @@ public class JobInProgress {
static final char EQUALS = '=';
static final char[] charsToEscape =
{StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
-
+
+ static class SummaryBuilder {
+ final StringBuilder buffer = new StringBuilder();
+
+ // A little optimization for a very common case
+ SummaryBuilder add(String key, long value) {
+ return _add(key, Long.toString(value));
+ }
+
+ <T> SummaryBuilder add(String key, T value) {
+ return _add(key, StringUtils.escapeString(String.valueOf(value),
+ StringUtils.ESCAPE_CHAR, charsToEscape));
+ }
+
+ SummaryBuilder add(SummaryBuilder summary) {
+ if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
+ buffer.append(summary.buffer);
+ return this;
+ }
+
+ SummaryBuilder _add(String key, String value) {
+ if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
+ buffer.append(key).append(EQUALS).append(value);
+ return this;
+ }
+
+ @Override public String toString() {
+ return buffer.toString();
+ }
+ }
+
+ static SummaryBuilder getTaskLaunchTimesSummary(JobInProgress job) {
+ SummaryBuilder summary = new SummaryBuilder();
+ Map<TaskType, Long> timeMap = job.getFirstTaskLaunchTimes();
+
+ synchronized(timeMap) {
+ for (Map.Entry<TaskType, Long> e : timeMap.entrySet()) {
+ summary.add("first"+ StringUtils.camelize(e.getKey().name()) +
+ "TaskLaunchTime", e.getValue().longValue());
+ }
+ }
+ return summary;
+ }
+
/**
* Log a summary of the job's runtime.
*
@@ -3507,12 +3569,6 @@ public class JobInProgress {
public static void logJobSummary(JobInProgress job, ClusterStatus cluster) {
JobStatus status = job.getStatus();
JobProfile profile = job.getProfile();
- String user = StringUtils.escapeString(profile.getUser(),
- StringUtils.ESCAPE_CHAR,
- charsToEscape);
- String queue = StringUtils.escapeString(profile.getQueueName(),
- StringUtils.ESCAPE_CHAR,
- charsToEscape);
Counters jobCounters = job.getJobCounters();
long mapSlotSeconds =
(jobCounters.getCounter(JobCounter.SLOTS_MILLIS_MAPS) +
@@ -3521,30 +3577,25 @@ public class JobInProgress {
(jobCounters.getCounter(JobCounter.SLOTS_MILLIS_REDUCES) +
jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
- LOG.info("jobId=" + job.getJobID() + StringUtils.COMMA +
- "submitTime" + EQUALS + job.getStartTime() + StringUtils.COMMA +
- "launchTime" + EQUALS + job.getLaunchTime() + StringUtils.COMMA +
- "finishTime" + EQUALS + job.getFinishTime() + StringUtils.COMMA +
- "numMaps" + EQUALS + job.getTasks(TaskType.MAP).length +
- StringUtils.COMMA +
- "numSlotsPerMap" + EQUALS + job.getNumSlotsPerMap() +
- StringUtils.COMMA +
- "numReduces" + EQUALS + job.getTasks(TaskType.REDUCE).length +
- StringUtils.COMMA +
- "numSlotsPerReduce" + EQUALS + job.getNumSlotsPerReduce() +
- StringUtils.COMMA +
- "user" + EQUALS + user + StringUtils.COMMA +
- "queue" + EQUALS + queue + StringUtils.COMMA +
- "status" + EQUALS +
- JobStatus.getJobRunState(status.getRunState()) +
- StringUtils.COMMA +
- "mapSlotSeconds" + EQUALS + mapSlotSeconds + StringUtils.COMMA +
- "reduceSlotsSeconds" + EQUALS + reduceSlotSeconds +
- StringUtils.COMMA +
- "clusterMapCapacity" + EQUALS + cluster.getMaxMapTasks() +
- StringUtils.COMMA +
- "clusterReduceCapacity" + EQUALS + cluster.getMaxReduceTasks()
- );
+ SummaryBuilder summary = new SummaryBuilder()
+ .add("jobId", job.getJobID())
+ .add("submitTime", job.getStartTime())
+ .add("launchTime", job.getLaunchTime())
+ .add(getTaskLaunchTimesSummary(job))
+ .add("finishTime", job.getFinishTime())
+ .add("numMaps", job.getTasks(TaskType.MAP).length)
+ .add("numSlotsPerMap", job.getNumSlotsPerMap())
+ .add("numReduces", job.getTasks(TaskType.REDUCE).length)
+ .add("numSlotsPerReduce", job.getNumSlotsPerReduce())
+ .add("user", profile.getUser())
+ .add("queue", profile.getQueueName())
+ .add("status", JobStatus.getJobRunState(status.getRunState()))
+ .add("mapSlotSeconds", mapSlotSeconds)
+ .add("reduceSlotsSeconds", reduceSlotSeconds)
+ .add("clusterMapCapacity", cluster.getMaxMapTasks())
+ .add("clusterReduceCapacity", cluster.getMaxReduceTasks());
+
+ LOG.info(summary);
}
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java?rev=951812&r1=951811&r2=951812&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java Sun Jun 6 04:19:13 2010
@@ -33,6 +33,7 @@ import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
+import static org.mockito.Mockito.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,6 +42,7 @@ import org.apache.hadoop.mapred.FakeObje
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -112,7 +114,7 @@ public class TestJobInProgress extends T
return splits;
}
- private void makeRunning(TaskAttemptID taskId, TaskInProgress tip,
+ private void makeRunning(TaskAttemptID taskId, TaskInProgress tip,
String taskTracker) {
TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId,
0.0f, 1, TaskStatus.State.RUNNING, "", "", taskTracker,
@@ -284,4 +286,58 @@ public class TestJobInProgress extends T
assertEquals(pendingReduces, jip.pendingReduces());
}
+ public void testJobSummary() throws Exception {
+ int numMaps = 2;
+ int numReds = 2;
+ JobConf conf = new JobConf();
+ conf.setNumMapTasks(numMaps);
+ conf.setNumReduceTasks(numReds);
+ // Spying a fake is easier than mocking here
+ MyFakeJobInProgress jspy = spy(new MyFakeJobInProgress(conf, jobTracker));
+ jspy.initTasks();
+ TaskAttemptID tid;
+
+ // Launch some map tasks
+ for (int i = 0; i < numMaps; i++) {
+ jspy.maps[i].setExecStartTime(i + 1);
+ tid = jspy.findAndRunNewTask(true, trackers[i], hosts[i],
+ clusterSize, numUniqueHosts);
+ jspy.finishTask(tid);
+ }
+
+ // Launch some reduce tasks
+ for (int i = 0; i < numReds; i++) {
+ jspy.reduces[i].setExecStartTime(i + numMaps + 1);
+ tid = jspy.findAndRunNewTask(false, trackers[i], hosts[i],
+ clusterSize, numUniqueHosts);
+ jspy.finishTask(tid);
+ }
+
+ // Should be invoked numMaps + numReds times by different TIP objects
+ verify(jspy, times(4)).setFirstTaskLaunchTime(any(TaskInProgress.class));
+
+ ClusterStatus cspy = spy(new ClusterStatus(4, 0, 0, 0, 0, 4, 4,
+ JobTracker.State.RUNNING, 0));
+
+ JobInProgress.JobSummary.logJobSummary(jspy, cspy);
+
+ verify(jspy).getStatus();
+ verify(jspy).getProfile();
+ verify(jspy).getJobCounters();
+ verify(jspy, atLeastOnce()).getJobID();
+ verify(jspy).getStartTime();
+ verify(jspy).getFirstTaskLaunchTimes();
+ verify(jspy).getFinishTime();
+ verify(jspy).getTasks(TaskType.MAP);
+ verify(jspy).getTasks(TaskType.REDUCE);
+ verify(jspy).getNumSlotsPerMap();
+ verify(jspy).getNumSlotsPerReduce();
+ verify(cspy).getMaxMapTasks();
+ verify(cspy).getMaxReduceTasks();
+
+ assertEquals("firstMapTaskLaunchTime", 1,
+ jspy.getFirstTaskLaunchTimes().get(TaskType.MAP).longValue());
+ assertEquals("firstReduceTaskLaunchTime", 3,
+ jspy.getFirstTaskLaunchTimes().get(TaskType.REDUCE).longValue());
+ }
}