You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2011/07/26 00:44:44 UTC

svn commit: r1150945 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: Driver.java MapRedStats.java exec/HadoopJobExecHelper.java exec/Utilities.java session/SessionState.java

Author: heyongqiang
Date: Mon Jul 25 22:44:42 2011
New Revision: 1150945

URL: http://svn.apache.org/viewvc?rev=1150945&view=rev
Log:
HIVE-2236: Print Hadoop's CPU milliseconds in Cli. (Siying Dong via He Yongqiang)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1150945&r1=1150944&r2=1150945&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Mon Jul 25 22:44:42 2011
@@ -1036,6 +1036,8 @@ public class Driver implements CommandPr
 
       DriverContext driverCxt = new DriverContext(runnable, ctx);
 
+      SessionState.get().setLastMapRedStatsList(new ArrayList<MapRedStats>());
+
       // Add root Tasks to runnable
 
       for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
@@ -1180,6 +1182,17 @@ public class Driver implements CommandPr
         conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, "");
       }
       Utilities.PerfLogEnd(LOG, "Driver.execute");
+
+      if (SessionState.get().getLastMapRedStatsList() != null
+          && SessionState.get().getLastMapRedStatsList().size() > 0) {
+        long totalCpu = 0;
+        console.printInfo("MapReduce Jobs Launched: ");
+        for (int i = 0; i < SessionState.get().getLastMapRedStatsList().size(); i++) {
+          console.printInfo("Job " + i + ": " + SessionState.get().getLastMapRedStatsList().get(i));
+          totalCpu += SessionState.get().getLastMapRedStatsList().get(i).getCpuMSec();
+        }
+        console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
+      }
     }
     plan.setDone();
 

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java?rev=1150945&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java Mon Jul 25 22:44:42 2011
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+/**
+ * MapRedStats.
+ *
+ * A data structure to keep one mapreduce's stats:
+ * number of mappers, number of reducers, accumulative CPU time and whether it
+ * succeeds.
+ *
+ */
+public class MapRedStats {
+  int numMap;
+  int numReduce;
+  long cpuMSec;
+  long hdfsRead = -1;
+  long hdfsWrite = -1;
+  long mapInputRecords = -1;
+  long mapOutputRecords = -1;
+  long reduceInputRecords = -1;
+  long reduceOutputRecords = -1;
+  long reduceShuffleBytes = -1;
+  boolean success;
+
+  public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess) {
+    this.numMap = numMap;
+    this.numReduce = numReduce;
+    this.cpuMSec = cpuMSec;
+    this.success = ifSuccess;
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  public long getCpuMSec() {
+    return cpuMSec;
+  }
+
+  public int getNumMap() {
+    return numMap;
+  }
+
+  public void setNumMap(int numMap) {
+    this.numMap = numMap;
+  }
+
+  public int getNumReduce() {
+    return numReduce;
+  }
+
+  public void setNumReduce(int numReduce) {
+    this.numReduce = numReduce;
+  }
+
+  public long getHdfsRead() {
+    return hdfsRead;
+  }
+
+  public void setHdfsRead(long hdfsRead) {
+    this.hdfsRead = hdfsRead;
+  }
+
+  public long getHdfsWrite() {
+    return hdfsWrite;
+  }
+
+  public void setHdfsWrite(long hdfsWrite) {
+    this.hdfsWrite = hdfsWrite;
+  }
+
+  public long getMapInputRecords() {
+    return mapInputRecords;
+  }
+
+  public void setMapInputRecords(long mapInputRecords) {
+    this.mapInputRecords = mapInputRecords;
+  }
+
+  public long getMapOutputRecords() {
+    return mapOutputRecords;
+  }
+
+  public void setMapOutputRecords(long mapOutputRecords) {
+    this.mapOutputRecords = mapOutputRecords;
+  }
+
+  public long getReduceInputRecords() {
+    return reduceInputRecords;
+  }
+
+  public void setReduceInputRecords(long reduceInputRecords) {
+    this.reduceInputRecords = reduceInputRecords;
+  }
+
+  public long getReduceOutputRecords() {
+    return reduceOutputRecords;
+  }
+
+  public void setReduceOutputRecords(long reduceOutputRecords) {
+    this.reduceOutputRecords = reduceOutputRecords;
+  }
+
+  public long getReduceShuffleBytes() {
+    return reduceShuffleBytes;
+  }
+
+  public void setReduceShuffleBytes(long reduceShuffleBytes) {
+    this.reduceShuffleBytes = reduceShuffleBytes;
+  }
+
+  public void setCpuMSec(long cpuMSec) {
+    this.cpuMSec = cpuMSec;
+  }
+
+  public void setSuccess(boolean success) {
+    this.success = success;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    if (numMap > 0) {
+      sb.append("Map: " + numMap + "  ");
+    }
+
+    if (numReduce > 0) {
+      sb.append("Reduce: " + numReduce + "  ");
+    }
+
+    if (cpuMSec > 0) {
+      sb.append(" Accumulative CPU: " + (cpuMSec / 1000D) + " sec  ");
+    }
+
+    if (hdfsRead >= 0) {
+      sb.append(" HDFS Read: " + hdfsRead);
+    }
+
+    if (hdfsWrite >= 0) {
+      sb.append(" HDFS Write: " + hdfsWrite);
+    }
+
+    sb.append(" " + (success ? "SUCESS" : "FAIL"));
+
+    return sb.toString();
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1150945&r1=1150944&r2=1150945&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Mon Jul 25 22:44:42 2011
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.MapRedStats;
 import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
 import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
 import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
@@ -43,12 +44,13 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.Counters.Counter;
 
 public class HadoopJobExecHelper {
-  
+
   protected transient JobConf job;
   protected Task<? extends Serializable> task;
-  
+
   protected transient int mapProgress = 0;
   protected transient int reduceProgress = 0;
   public transient String jobId;
@@ -69,10 +71,10 @@ public class HadoopJobExecHelper {
       return;
     }
     if(callBackObj != null) {
-      callBackObj.updateCounters(ctrs, rj);      
+      callBackObj.updateCounters(ctrs, rj);
     }
   }
-  
+
   /**
    * This msg pattern is used to track when a job is started.
    *
@@ -113,7 +115,7 @@ public class HadoopJobExecHelper {
     return reduceProgress == 100;
   }
 
-  
+
   public String getJobId() {
     return jobId;
   }
@@ -122,10 +124,10 @@ public class HadoopJobExecHelper {
     this.jobId = jobId;
   }
 
-  
+
   public HadoopJobExecHelper() {
   }
-  
+
   public HadoopJobExecHelper(JobConf job, LogHelper console,
       Task<? extends Serializable> task, HadoopJobExecHook hookCallBack) {
     this.job = job;
@@ -134,7 +136,7 @@ public class HadoopJobExecHelper {
     this.callBackObj = hookCallBack;
   }
 
-  
+
   /**
    * A list of the currently running jobs spawned in this Hive instance that is used to kill all
    * running jobs in the event of an unexpected shutdown - i.e., the JVM shuts down while there are
@@ -143,7 +145,7 @@ public class HadoopJobExecHelper {
   public static Map<String, String> runningJobKillURIs = Collections
       .synchronizedMap(new HashMap<String, String>());
 
-  
+
   /**
    * In Hive, when the user control-c's the command line, any running jobs spawned from that command
    * line are best-effort killed.
@@ -200,12 +202,13 @@ public class HadoopJobExecHelper {
     }
     return this.callBackObj.checkFatalErrors(ctrs, errMsg);
   }
-  
-  private boolean progress(ExecDriverTaskHandle th) throws IOException {
+
+  private MapRedStats progress(ExecDriverTaskHandle th) throws IOException {
     JobClient jc = th.getJobClient();
     RunningJob rj = th.getRunningJob();
     String lastReport = "";
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+    //DecimalFormat longFormatter = new DecimalFormat("###,###");
     long reportTime = System.currentTimeMillis();
     long maxReportInterval = 60 * 1000; // One minute
     boolean fatal = false;
@@ -213,6 +216,10 @@ public class HadoopJobExecHelper {
     long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL);
     boolean initializing = true;
     boolean initOutputPrinted = false;
+    long cpuMsec = -1;
+    int numMap = -1;
+    int numReduce = -1;
+
     while (!rj.isComplete()) {
       try {
         Thread.sleep(pullInterval);
@@ -233,24 +240,24 @@ public class HadoopJobExecHelper {
 
         String logMapper;
         String logReducer;
-	
+
         TaskReport[] mappers = jc.getMapTaskReports(rj.getJobID());
         if (mappers == null) {
           logMapper = "no information for number of mappers; ";
         } else {
-          int numMap = mappers.length;
+          numMap = mappers.length;
           if (ss != null) {
             ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
               Keys.TASK_NUM_MAPPERS, Integer.toString(numMap));
           }
           logMapper = "number of mappers: " + numMap + "; ";
         }
-	
+
         TaskReport[] reducers = jc.getReduceTaskReports(rj.getJobID());
         if (reducers == null) {
           logReducer = "no information for number of reducers. ";
         } else {
-          int numReduce = reducers.length;
+          numReduce = reducers.length;
           if (ss != null) {
             ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
               Keys.TASK_NUM_REDUCERS, Integer.toString(numReduce));
@@ -295,8 +302,22 @@ public class HadoopJobExecHelper {
       String report = " " + getId() + " map = " + mapProgress + "%,  reduce = " + reduceProgress
           + "%";
 
+
       if (!report.equals(lastReport)
           || System.currentTimeMillis() >= reportTime + maxReportInterval) {
+        // find out CPU msecs
+        // In the case that we can't find out this number, we just skip the step to print
+        // it out.
+        Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+            "CPU_MILLISECONDS");
+        if (counterCpuMsec != null) {
+          long newCpuMSec = counterCpuMsec.getValue();
+          if (newCpuMSec > 0) {
+            cpuMsec = newCpuMSec;
+            report += ", Cumulative CPU "
+              + (cpuMsec / 1000D) + " sec";
+          }
+        }
 
         // write out serialized plan with counters to log file
         // LOG.info(queryPlan);
@@ -315,9 +336,14 @@ public class HadoopJobExecHelper {
       }
     }
 
+    if (cpuMsec > 0) {
+      console.printInfo("MapReduce Total cumulative CPU time: "
+          + Utilities.formatMsecToStr(cpuMsec));
+    }
+
     boolean success;
-    Counters ctrs = th.getCounters();
 
+    Counters ctrs = th.getCounters();
     if (fatal) {
       success = false;
     } else {
@@ -331,6 +357,61 @@ public class HadoopJobExecHelper {
       }
     }
 
+    Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+        "CPU_MILLISECONDS");
+    if (counterCpuMsec != null) {
+      long newCpuMSec = counterCpuMsec.getValue();
+      if (newCpuMSec > cpuMsec) {
+        cpuMsec = newCpuMSec;
+      }
+    }
+
+    MapRedStats mapRedStats = new MapRedStats(numMap, numReduce, cpuMsec, success);
+
+    Counter ctr;
+
+    ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+        "REDUCE_SHUFFLE_BYTES");
+    if (ctr != null) {
+      mapRedStats.setReduceShuffleBytes(ctr.getValue());
+    }
+
+    ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+        "MAP_INPUT_RECORDS");
+    if (ctr != null) {
+      mapRedStats.setMapInputRecords(ctr.getValue());
+    }
+
+    ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+        "MAP_OUTPUT_RECORDS");
+    if (ctr != null) {
+      mapRedStats.setMapOutputRecords(ctr.getValue());
+    }
+
+    ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+        "REDUCE_INPUT_RECORDS");
+    if (ctr != null) {
+      mapRedStats.setReduceInputRecords(ctr.getValue());
+    }
+
+    ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+        "REDUCE_OUTPUT_RECORDS");
+    if (ctr != null) {
+      mapRedStats.setReduceOutputRecords(ctr.getValue());
+    }
+
+    ctr = ctrs.findCounter("FileSystemCounters",
+        "HDFS_BYTES_READ");
+    if (ctr != null) {
+      mapRedStats.setHdfsRead(ctr.getValue());
+    }
+
+    ctr = ctrs.findCounter("FileSystemCounters",
+        "HDFS_BYTES_WRITTEN");
+    if (ctr != null) {
+      mapRedStats.setHdfsWrite(ctr.getValue());
+    }
+
     this.task.setDone();
     // update based on the final value of the counters
     updateCounters(ctrs, rj);
@@ -340,9 +421,9 @@ public class HadoopJobExecHelper {
       this.callBackObj.logPlanProgress(ss);
     }
     // LOG.info(queryPlan);
-    return (success);
+    return mapRedStats;
   }
-  
+
   private String getId() {
     return this.task.getId();
   }
@@ -396,7 +477,7 @@ public class HadoopJobExecHelper {
       return rj.getCounters();
     }
   }
-  
+
   // Used for showJobFailDebugInfo
   private static class TaskInfo {
     String jobId;
@@ -419,7 +500,7 @@ public class HadoopJobExecHelper {
       return jobId;
     }
   }
-  
+
   @SuppressWarnings("deprecation")
   private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
     // Mapping from task ID to the number of failures
@@ -549,9 +630,9 @@ public class HadoopJobExecHelper {
 
   public int progress(RunningJob rj, JobClient jc) throws IOException {
     jobId = rj.getJobID();
-    
+
     int returnVal = 0;
-    
+
     // remove the pwd from conf file so that job tracker doesn't show this
     // logs
     String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD);
@@ -570,7 +651,14 @@ public class HadoopJobExecHelper {
 
     ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
     jobInfo(rj);
-    boolean success = progress(th);
+    MapRedStats mapRedStats = progress(th);
+
+    // Not always there is a SessionState. Sometimes ExeDriver is directly invoked
+    // for special modes. In that case, SessionState.get() is empty.
+    if (SessionState.get() != null) {
+      SessionState.get().getLastMapRedStatsList().add(mapRedStats);
+    }
+    boolean success = mapRedStats.isSuccess();
 
     String statusMesg = getJobEndMsg(rj.getJobID());
     if (!success) {
@@ -583,7 +671,7 @@ public class HadoopJobExecHelper {
     } else {
       console.printInfo(statusMesg);
     }
-    
+
     return returnVal;
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1150945&r1=1150944&r2=1150945&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Jul 25 22:44:42 2011
@@ -2257,4 +2257,44 @@ public final class Utilities {
     }
     return sb.toString();
   }
+
+  /**
+   * Format number of milliseconds to strings
+   *
+   * @param msec milliseconds
+   * @return a formatted string like "x days y hours z minutes a seconds b msec"
+   */
+  public static String formatMsecToStr(long msec) {
+    long day = -1, hour = -1, minute = -1, second = -1;
+    long ms = msec % 1000;
+    long timeLeft = msec / 1000;
+    if (timeLeft > 0) {
+      second = timeLeft % 60;
+      timeLeft /= 60;
+      if (timeLeft > 0) {
+        minute = timeLeft % 60;
+        timeLeft /= 60;
+        if (timeLeft > 0) {
+          hour = timeLeft % 24;
+          day = timeLeft / 24;
+        }
+      }
+    }
+    StringBuilder sb = new StringBuilder();
+    if (day != -1) {
+      sb.append(day + " days ");
+    }
+    if (hour != -1) {
+      sb.append(hour + " hours ");
+    }
+    if (minute != -1) {
+      sb.append(minute + " minutes ");
+    }
+    if (second != -1) {
+      sb.append(second + " seconds ");
+    }
+    sb.append(ms + " msec");
+
+    return sb.toString();
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1150945&r1=1150944&r2=1150945&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Jul 25 22:44:42 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.MapRedStats;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -107,6 +108,8 @@ public class SessionState {
 
   private CreateTableAutomaticGrant createTableGrants;
 
+  private List<MapRedStats> lastMapRedStatsList;
+
   /**
    * Lineage state.
    */
@@ -641,4 +644,12 @@ public class SessionState {
   public void setCreateTableGrants(CreateTableAutomaticGrant createTableGrants) {
     this.createTableGrants = createTableGrants;
   }
+
+  public List<MapRedStats> getLastMapRedStatsList() {
+    return lastMapRedStatsList;
+  }
+
+  public void setLastMapRedStatsList(List<MapRedStats> lastMapRedStatsList) {
+    this.lastMapRedStatsList = lastMapRedStatsList;
+  }
 }