You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/06/06 06:19:14 UTC

svn commit: r951812 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java

Author: cdouglas
Date: Sun Jun  6 04:19:13 2010
New Revision: 951812

URL: http://svn.apache.org/viewvc?rev=951812&view=rev
Log:
MAPREDUCE-1545. Add timestamps for first task type launched in job summary. Contributed by Luke Lu

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=951812&r1=951811&r2=951812&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Jun  6 04:19:13 2010
@@ -39,6 +39,9 @@ Trunk (unreleased changes)
     then streaming does not throw error. (Amareshwari Sriramadasu via
     vinodkv)
 
+    MAPREDUCE-1545. Add timestamps for first task type launched in job summary.
+    (Luke Lu via cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=951812&r1=951811&r2=951812&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sun Jun  6 04:19:13 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
@@ -225,6 +226,10 @@ public class JobInProgress {
   long startTime;
   long launchTime;
   long finishTime;
+
+  // First *task launch times
+  final Map<TaskType, Long> firstTaskLaunchTimes =
+      new EnumMap<TaskType, Long>(TaskType.class);
   
   // Indicates how many times the job got restarted
   private final int restartCount;
@@ -842,6 +847,9 @@ public class JobInProgress {
   public synchronized long getLaunchTime() {
     return launchTime;
   }
+  Map<TaskType, Long> getFirstTaskLaunchTimes() {
+    return firstTaskLaunchTimes;
+  }
   public long getStartTime() {
     return startTime;
   }
@@ -1649,7 +1657,7 @@ public class JobInProgress {
           name, splits);
       
       jobHistory.logEvent(tse, tip.getJob().jobId);
-      
+      setFirstTaskLaunchTime(tip);
     }
     if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
       jobCounters.incrCounter(counter, 1);
@@ -1688,6 +1696,17 @@ public class JobInProgress {
       }
     }
   }
+
+  void setFirstTaskLaunchTime(TaskInProgress tip) {
+    TaskType key = getTaskType(tip);
+
+    synchronized(firstTaskLaunchTimes) {
+      // Could be optimized to do only one lookup with a little more code
+      if (!firstTaskLaunchTimes.containsKey(key)) {
+        firstTaskLaunchTimes.put(key, tip.getExecStartTime());
+      }
+    }
+  }
     
   public static String convertTrackerNameToHostName(String trackerName) {
     // Ugly!
@@ -3495,7 +3514,50 @@ public class JobInProgress {
     static final char EQUALS = '=';
     static final char[] charsToEscape = 
       {StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
-    
+
+    static class SummaryBuilder {
+      final StringBuilder buffer = new StringBuilder();
+
+      // A little optimization for a very common case
+      SummaryBuilder add(String key, long value) {
+        return _add(key, Long.toString(value));
+      }
+
+      <T> SummaryBuilder add(String key, T value) {
+        return _add(key, StringUtils.escapeString(String.valueOf(value),
+                    StringUtils.ESCAPE_CHAR, charsToEscape));
+      }
+
+      SummaryBuilder add(SummaryBuilder summary) {
+        if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
+        buffer.append(summary.buffer);
+        return this;
+      }
+
+      SummaryBuilder _add(String key, String value) {
+        if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
+        buffer.append(key).append(EQUALS).append(value);
+        return this;
+      }
+
+      @Override public String toString() {
+        return buffer.toString();
+      }
+    }
+
+    static SummaryBuilder getTaskLaunchTimesSummary(JobInProgress job) {
+      SummaryBuilder summary = new SummaryBuilder();
+      Map<TaskType, Long> timeMap = job.getFirstTaskLaunchTimes();
+
+      synchronized(timeMap) {
+        for (Map.Entry<TaskType, Long> e : timeMap.entrySet()) {
+          summary.add("first"+ StringUtils.camelize(e.getKey().name()) +
+                      "TaskLaunchTime", e.getValue().longValue());
+        }
+      }
+      return summary;
+    }
+
     /**
      * Log a summary of the job's runtime.
      * 
@@ -3507,12 +3569,6 @@ public class JobInProgress {
     public static void logJobSummary(JobInProgress job, ClusterStatus cluster) {
       JobStatus status = job.getStatus();
       JobProfile profile = job.getProfile();
-      String user = StringUtils.escapeString(profile.getUser(), 
-                                             StringUtils.ESCAPE_CHAR, 
-                                             charsToEscape);
-      String queue = StringUtils.escapeString(profile.getQueueName(), 
-                                              StringUtils.ESCAPE_CHAR, 
-                                              charsToEscape);
       Counters jobCounters = job.getJobCounters();
       long mapSlotSeconds = 
         (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_MAPS) +
@@ -3521,30 +3577,25 @@ public class JobInProgress {
         (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_REDUCES) +
          jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
 
-      LOG.info("jobId=" + job.getJobID() + StringUtils.COMMA +
-               "submitTime" + EQUALS + job.getStartTime() + StringUtils.COMMA +
-               "launchTime" + EQUALS + job.getLaunchTime() + StringUtils.COMMA +
-               "finishTime" + EQUALS + job.getFinishTime() + StringUtils.COMMA +
-               "numMaps" + EQUALS + job.getTasks(TaskType.MAP).length + 
-                           StringUtils.COMMA +
-               "numSlotsPerMap" + EQUALS + job.getNumSlotsPerMap() + 
-                                  StringUtils.COMMA +
-               "numReduces" + EQUALS + job.getTasks(TaskType.REDUCE).length + 
-                              StringUtils.COMMA +
-               "numSlotsPerReduce" + EQUALS + job.getNumSlotsPerReduce() + 
-                                     StringUtils.COMMA +
-               "user" + EQUALS + user + StringUtils.COMMA +
-               "queue" + EQUALS + queue + StringUtils.COMMA +
-               "status" + EQUALS + 
-                          JobStatus.getJobRunState(status.getRunState()) + 
-                          StringUtils.COMMA + 
-               "mapSlotSeconds" + EQUALS + mapSlotSeconds + StringUtils.COMMA +
-               "reduceSlotsSeconds" + EQUALS + reduceSlotSeconds  + 
-                                      StringUtils.COMMA +
-               "clusterMapCapacity" + EQUALS + cluster.getMaxMapTasks() + 
-                                      StringUtils.COMMA +
-               "clusterReduceCapacity" + EQUALS + cluster.getMaxReduceTasks()
-      );
+      SummaryBuilder summary = new SummaryBuilder()
+          .add("jobId", job.getJobID())
+          .add("submitTime", job.getStartTime())
+          .add("launchTime", job.getLaunchTime())
+          .add(getTaskLaunchTimesSummary(job))
+          .add("finishTime", job.getFinishTime())
+          .add("numMaps", job.getTasks(TaskType.MAP).length)
+          .add("numSlotsPerMap", job.getNumSlotsPerMap())
+          .add("numReduces", job.getTasks(TaskType.REDUCE).length)
+          .add("numSlotsPerReduce", job.getNumSlotsPerReduce())
+          .add("user", profile.getUser())
+          .add("queue", profile.getQueueName())
+          .add("status", JobStatus.getJobRunState(status.getRunState()))
+          .add("mapSlotSeconds", mapSlotSeconds)
+          .add("reduceSlotsSeconds", reduceSlotSeconds)
+          .add("clusterMapCapacity", cluster.getMaxMapTasks())
+          .add("clusterReduceCapacity", cluster.getMaxReduceTasks());
+
+      LOG.info(summary);
     }
   }
   

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java?rev=951812&r1=951811&r2=951812&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java Sun Jun  6 04:19:13 2010
@@ -33,6 +33,7 @@ import junit.extensions.TestSetup;
 import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
+import static org.mockito.Mockito.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,6 +42,7 @@ import org.apache.hadoop.mapred.FakeObje
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -112,7 +114,7 @@ public class TestJobInProgress extends T
       return splits;
     }
 
-    private void makeRunning(TaskAttemptID taskId, TaskInProgress tip, 
+    private void makeRunning(TaskAttemptID taskId, TaskInProgress tip,
         String taskTracker) {
       TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
           0.0f, 1, TaskStatus.State.RUNNING, "", "", taskTracker,
@@ -284,4 +286,58 @@ public class TestJobInProgress extends T
     assertEquals(pendingReduces, jip.pendingReduces());
   }
 
+  public void testJobSummary() throws Exception {
+    int numMaps = 2;
+    int numReds = 2;
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReds);
+    // Spying a fake is easier than mocking here
+    MyFakeJobInProgress jspy = spy(new MyFakeJobInProgress(conf, jobTracker));
+    jspy.initTasks();
+    TaskAttemptID tid;
+
+    // Launch some map tasks
+    for (int i = 0; i < numMaps; i++) {
+      jspy.maps[i].setExecStartTime(i + 1);
+      tid = jspy.findAndRunNewTask(true, trackers[i], hosts[i],
+                                   clusterSize, numUniqueHosts);
+      jspy.finishTask(tid);
+    }
+
+    // Launch some reduce tasks
+    for (int i = 0; i < numReds; i++) {
+      jspy.reduces[i].setExecStartTime(i + numMaps + 1);
+      tid = jspy.findAndRunNewTask(false, trackers[i], hosts[i],
+                                   clusterSize, numUniqueHosts);
+      jspy.finishTask(tid);
+    }
+
+    // Should be invoked numMaps + numReds times by different TIP objects
+    verify(jspy, times(4)).setFirstTaskLaunchTime(any(TaskInProgress.class));
+
+    ClusterStatus cspy = spy(new ClusterStatus(4, 0, 0, 0, 0, 4, 4,
+                                               JobTracker.State.RUNNING, 0));
+
+    JobInProgress.JobSummary.logJobSummary(jspy, cspy);
+
+    verify(jspy).getStatus();
+    verify(jspy).getProfile();
+    verify(jspy).getJobCounters();
+    verify(jspy, atLeastOnce()).getJobID();
+    verify(jspy).getStartTime();
+    verify(jspy).getFirstTaskLaunchTimes();
+    verify(jspy).getFinishTime();
+    verify(jspy).getTasks(TaskType.MAP);
+    verify(jspy).getTasks(TaskType.REDUCE);
+    verify(jspy).getNumSlotsPerMap();
+    verify(jspy).getNumSlotsPerReduce();
+    verify(cspy).getMaxMapTasks();
+    verify(cspy).getMaxReduceTasks();
+
+    assertEquals("firstMapTaskLaunchTime", 1,
+        jspy.getFirstTaskLaunchTimes().get(TaskType.MAP).longValue());
+    assertEquals("firstReduceTaskLaunchTime", 3,
+        jspy.getFirstTaskLaunchTimes().get(TaskType.REDUCE).longValue());
+  }
 }