You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/05/06 13:35:37 UTC

svn commit: r653749 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/mapred/ src/webapps/job/

Author: ddas
Date: Tue May  6 04:35:36 2008
New Revision: 653749

URL: http://svn.apache.org/viewvc?rev=653749&view=rev
Log:
HADOOP-2181. This issue adds logging for input splits in Jobtracker log and jobHistory log. Also adds web UI for viewing input splits in job UI and history UI. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/HistoryViewer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/webapps/job/taskdetails.jsp
    hadoop/core/trunk/src/webapps/job/taskdetailshistory.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=653749&r1=653748&r2=653749&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May  6 04:35:36 2008
@@ -37,6 +37,10 @@
     The property ipc.client.timeout is removed from the default hadoop
     configuration. It also removes metrics RpcOpsDiscardedOPsNum. (hairong)
 
+    HADOOP-2181. This issue adds logging for input splits in Jobtracker log 
+    and jobHistory log. Also adds web UI for viewing input splits in job UI 
+    and history UI. (Amareshwari Sriramadasu via ddas)
+
   NEW FEATURES
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/HistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/HistoryViewer.java?rev=653749&r1=653748&r2=653749&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/HistoryViewer.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/HistoryViewer.java Tue May  6 04:35:36 2008
@@ -97,6 +97,7 @@
     printJobDetails();
     printTaskSummary();
     printJobAnalysis();
+    printSplits();
     printTasks("MAP", "FAILED");
     printTasks("MAP", "KILLED");
     printTasks("REDUCE", "FAILED");
@@ -140,6 +141,24 @@
     System.out.println(jobDetails.toString());
   }
   
+  private void printSplits() {
+    StringBuffer splits = new StringBuffer();
+    Map<String, JobHistory.Task> tasks = job.getAllTasks();
+    splits.append("\nInput split Locations");
+    splits.append("\nTaskId\tSplits");
+    splits.append("\n====================================================");
+
+    for (JobHistory.Task task : tasks.values()) {
+      if (Values.MAP.name().equals(task.get(Keys.TASK_TYPE))) {
+        splits.append("\n");
+        splits.append(task.get(Keys.TASKID));
+        splits.append("\t"); 
+        splits.append(task.get(Keys.SPLITS));
+      }
+    }
+    System.out.println(splits.toString());
+  }
+
   private void printTasks(String taskType, String taskStatus) {
     Map<String, JobHistory.Task> tasks = job.getAllTasks();
     StringBuffer taskList = new StringBuffer();

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=653749&r1=653748&r2=653749&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Tue May  6 04:35:36 2008
@@ -94,7 +94,7 @@
     LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, 
     FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
-    SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS
+    SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS
   }
 
   /**
@@ -677,15 +677,18 @@
      * @param startTime startTime of tip. 
      */
     public static void logStarted(TaskID taskId, String taskType, 
-                                  long startTime){
+                                  long startTime, String splitLocations) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                      + taskId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
-                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME}, 
-                         new String[]{taskId.toString(), taskType, String.valueOf(startTime)});
+                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE ,
+                                    Keys.START_TIME, Keys.SPLITS}, 
+                         new String[]{taskId.toString(), taskType,
+                                      String.valueOf(startTime),
+                                      splitLocations});
         }
       }
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=653749&r1=653748&r2=653749&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue May  6 04:35:36 2008
@@ -265,6 +265,7 @@
 
       for(String host: splitLocations) {
         Node node = jobtracker.resolveAndAddToTopology(host);
+        LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
         for (int j = 0; j < maxLevel; j++) {
           node = JobTracker.getParentNode(node, j);
           List<TaskInProgress> hostMaps = cache.get(node);
@@ -317,6 +318,7 @@
                                    jobtracker, conf, this, i);
     }
     if (numMapTasks > 0) { 
+      LOG.info("Split info for job:" + jobId);
       nonRunningMapCache = createCache(splits, maxLevel);
     }
         
@@ -663,10 +665,10 @@
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       runningMapTasks += 1;
-      boolean wasRunning = maps[target].isRunning();
-      if (!wasRunning) {
+      if (maps[target].isFirstAttempt(result.getTaskID())) {
         JobHistory.Task.logStarted(maps[target].getTIPId(), Values.MAP.name(),
-                                   System.currentTimeMillis());
+                                   System.currentTimeMillis(),
+                                   maps[target].getSplitNodes());
       }
 
       jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_MAPS, 1);
@@ -696,10 +698,9 @@
     Task result = reduces[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       runningReduceTasks += 1;
-      boolean wasRunning = reduces[target].isRunning();
-      if (!wasRunning) {
+      if (reduces[target].isFirstAttempt(result.getTaskID())) {
         JobHistory.Task.logStarted(reduces[target].getTIPId(), Values.REDUCE.name(),
-                                   System.currentTimeMillis());
+                                   System.currentTimeMillis(), "");
       }
 
       jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_REDUCES, 1);
@@ -1307,7 +1308,8 @@
     tip.completed(taskid);
 
     // Update jobhistory 
-    String taskTrackerName = status.getTaskTracker();
+    String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(
+                               status.getTaskTracker()).getHost()).toString();
     if (status.getIsMap()){
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                                        taskTrackerName); 
@@ -1484,7 +1486,8 @@
     }
         
     // update job history
-    String taskTrackerName = status.getTaskTracker();
+    String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(
+                               status.getTaskTracker()).getHost()).toString();
     if (status.getIsMap()) {
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                 taskTrackerName);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=653749&r1=653748&r2=653749&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue May  6 04:35:36 2008
@@ -1932,7 +1932,7 @@
   /**
    * Returns specified TaskInProgress, or null.
    */
-  private TaskInProgress getTip(TaskID tipid) {
+  public TaskInProgress getTip(TaskID tipid) {
     JobInProgress job = jobs.get(tipid.getJobID());
     return (job == null ? null : job.getTaskInProgress(tipid));
   }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=653749&r1=653748&r2=653749&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue May  6 04:35:36 2008
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +31,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.net.Node;
 
 
 /*************************************************************
@@ -207,6 +210,15 @@
   public Task getTask(TaskAttemptID taskId) {
     return tasks.get(taskId);
   }
+
+  /**
+   * Is the Task associated with taskid is the first attempt of the tip? 
+   * @param taskId
+   * @return Returns true if the Task is the first attempt of the tip
+   */  
+  public boolean isFirstAttempt(TaskAttemptID taskId) {
+    return (taskId.getId() == 0); 
+  }
   
   /**
    * Is this tip currently running any tasks?
@@ -755,4 +767,37 @@
   public int getSuccessEventNumber() {
     return successEventNumber;
   }
+  
+  /** 
+   * Gets the Node list of input split locations sorted in rack order.
+   */ 
+  public String getSplitNodes() {
+    String[] splits = rawSplit.getLocations();
+    Node[] nodes = new Node[splits.length];
+    for (int i = 0; i < splits.length; i++) {
+      nodes[i] = jobtracker.getNode(splits[i]);
+    }
+    // sort nodes on rack location
+    Arrays.sort(nodes, new Comparator<Node>() {
+      public int compare(Node a, Node b) {
+        String left = a.getNetworkLocation();
+        String right = b.getNetworkLocation();
+        return left.compareTo(right);
+      }
+    }); 
+    return nodeToString(nodes);
+  }
+
+  private static String nodeToString(Node[] nodes) {
+    if (nodes == null || nodes.length == 0) {
+      return "";
+    }
+    StringBuffer ret = new StringBuffer(nodes[0].toString());
+    for(int i = 1; i < nodes.length;i++) {
+      ret.append(",");
+      ret.append(nodes[i].toString());
+    }
+    return ret.toString();
+  }
+
 }

Modified: hadoop/core/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/taskdetails.jsp?rev=653749&r1=653748&r2=653749&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/taskdetails.jsp Tue May  6 04:35:36 2008
@@ -109,7 +109,7 @@
         taskAttemptTracker = "http://" + taskTracker.getHost() + ":"
           + taskTracker.getHttpPort();
         out.print("<td><a href=\"" + taskAttemptTracker + "\">"
-          + taskTracker.getHost() + "</a></td>");
+          + tracker.getNode(taskTracker.getHost()) + "</a></td>");
         }
         out.print("<td>" + status.getRunState() + "</td>");
         out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2)
@@ -173,11 +173,27 @@
           out.print("<pre>&nbsp;</pre>");
         out.println("</td></tr>");
       }
-    }
   %>
 </table>
 </center>
 
+<%
+      if (ts[0].getIsMap()) {
+%>
+<h3>Input Split Locations</h3>
+<table border=2 cellpadding="5" cellspacing="2">
+<%
+        for (String split: StringUtils.split(tracker.getTip(
+                                         tipidObj).getSplitNodes())) {
+          out.println("<tr><td>" + split + "</td></tr>");
+        }
+%>
+</table>
+<%    
+      }
+    }
+%>
+
 <hr>
 <a href="jobdetails.jsp?jobid=<%=jobid%>">Go back to the job</a><br>
 <a href="jobtracker.jsp">Go back to JobTracker</a><br>

Modified: hadoop/core/trunk/src/webapps/job/taskdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/taskdetailshistory.jsp?rev=653749&r1=653748&r2=653749&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/taskdetailshistory.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/taskdetailshistory.jsp Tue May  6 04:35:36 2008
@@ -22,6 +22,7 @@
   JobHistory.JobInfo job = (JobHistory.JobInfo)
                               request.getSession().getAttribute("job");
   JobHistory.Task task = job.getAllTasks().get(taskid); 
+  String type = task.get(Keys.TASK_TYPE);
 %>
 <html>
 <body>
@@ -30,7 +31,7 @@
 <table border="2" cellpadding="5" cellspacing="2">
 <tr><td>Task Id</td><td>Start Time</td>
 <%	
-  if (Values.REDUCE.name().equals(task.get(Keys.TASK_TYPE))) {
+  if (Values.REDUCE.name().equals(type)) {
 %>
     <td>Shuffle Finished</td><td>Sort Finished</td>
 <%
@@ -39,10 +40,26 @@
 <td>Finish Time</td><td>Host</td><td>Error</td></tr>
 <%
   for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
-    printTaskAttempt(attempt, task.get(Keys.TASK_TYPE), out); 
+    printTaskAttempt(attempt, type, out);
   }
 %>
 </table>
+</center>
+<%	
+  if (Values.MAP.name().equals(type)) {
+%>
+<h3>Input Split Locations</h3>
+<table border="2" cellpadding="5" cellspacing="2">
+<%
+    for (String split : StringUtils.split(task.get(Keys.SPLITS)))
+    {
+      out.println("<tr><td>" + split + "</td></tr>");
+    }
+%>
+</table>    
+<%
+  }
+%>
 <%!
   private void printTaskAttempt(JobHistory.TaskAttempt taskAttempt,
                                 String type, JspWriter out) 
@@ -70,6 +87,5 @@
     out.print("</tr>"); 
   }
 %>
-</center>
 </body>
 </html>