You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2011/07/26 00:44:44 UTC
svn commit: r1150945 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
Driver.java MapRedStats.java exec/HadoopJobExecHelper.java
exec/Utilities.java session/SessionState.java
Author: heyongqiang
Date: Mon Jul 25 22:44:42 2011
New Revision: 1150945
URL: http://svn.apache.org/viewvc?rev=1150945&view=rev
Log:
HIVE-2236: Print Hadoop's CPU milliseconds in Cli. (Siying Dong via He Yongqiang)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1150945&r1=1150944&r2=1150945&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Mon Jul 25 22:44:42 2011
@@ -1036,6 +1036,8 @@ public class Driver implements CommandPr
DriverContext driverCxt = new DriverContext(runnable, ctx);
+ SessionState.get().setLastMapRedStatsList(new ArrayList<MapRedStats>());
+
// Add root Tasks to runnable
for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
@@ -1180,6 +1182,17 @@ public class Driver implements CommandPr
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, "");
}
Utilities.PerfLogEnd(LOG, "Driver.execute");
+
+ if (SessionState.get().getLastMapRedStatsList() != null
+ && SessionState.get().getLastMapRedStatsList().size() > 0) {
+ long totalCpu = 0;
+ console.printInfo("MapReduce Jobs Launched: ");
+ for (int i = 0; i < SessionState.get().getLastMapRedStatsList().size(); i++) {
+ console.printInfo("Job " + i + ": " + SessionState.get().getLastMapRedStatsList().get(i));
+ totalCpu += SessionState.get().getLastMapRedStatsList().get(i).getCpuMSec();
+ }
+ console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
+ }
}
plan.setDone();
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java?rev=1150945&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java Mon Jul 25 22:44:42 2011
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+/**
+ * MapRedStats.
+ *
+ * A data structure to keep one mapreduce's stats:
+ * number of mappers, number of reducers, accumulative CPU time and whether it
+ * succeeds.
+ *
+ */
+public class MapRedStats {
+ int numMap;
+ int numReduce;
+ long cpuMSec;
+ long hdfsRead = -1;
+ long hdfsWrite = -1;
+ long mapInputRecords = -1;
+ long mapOutputRecords = -1;
+ long reduceInputRecords = -1;
+ long reduceOutputRecords = -1;
+ long reduceShuffleBytes = -1;
+ boolean success;
+
+ public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess) {
+ this.numMap = numMap;
+ this.numReduce = numReduce;
+ this.cpuMSec = cpuMSec;
+ this.success = ifSuccess;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public long getCpuMSec() {
+ return cpuMSec;
+ }
+
+ public int getNumMap() {
+ return numMap;
+ }
+
+ public void setNumMap(int numMap) {
+ this.numMap = numMap;
+ }
+
+ public int getNumReduce() {
+ return numReduce;
+ }
+
+ public void setNumReduce(int numReduce) {
+ this.numReduce = numReduce;
+ }
+
+ public long getHdfsRead() {
+ return hdfsRead;
+ }
+
+ public void setHdfsRead(long hdfsRead) {
+ this.hdfsRead = hdfsRead;
+ }
+
+ public long getHdfsWrite() {
+ return hdfsWrite;
+ }
+
+ public void setHdfsWrite(long hdfsWrite) {
+ this.hdfsWrite = hdfsWrite;
+ }
+
+ public long getMapInputRecords() {
+ return mapInputRecords;
+ }
+
+ public void setMapInputRecords(long mapInputRecords) {
+ this.mapInputRecords = mapInputRecords;
+ }
+
+ public long getMapOutputRecords() {
+ return mapOutputRecords;
+ }
+
+ public void setMapOutputRecords(long mapOutputRecords) {
+ this.mapOutputRecords = mapOutputRecords;
+ }
+
+ public long getReduceInputRecords() {
+ return reduceInputRecords;
+ }
+
+ public void setReduceInputRecords(long reduceInputRecords) {
+ this.reduceInputRecords = reduceInputRecords;
+ }
+
+ public long getReduceOutputRecords() {
+ return reduceOutputRecords;
+ }
+
+ public void setReduceOutputRecords(long reduceOutputRecords) {
+ this.reduceOutputRecords = reduceOutputRecords;
+ }
+
+ public long getReduceShuffleBytes() {
+ return reduceShuffleBytes;
+ }
+
+ public void setReduceShuffleBytes(long reduceShuffleBytes) {
+ this.reduceShuffleBytes = reduceShuffleBytes;
+ }
+
+ public void setCpuMSec(long cpuMSec) {
+ this.cpuMSec = cpuMSec;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ if (numMap > 0) {
+ sb.append("Map: " + numMap + " ");
+ }
+
+ if (numReduce > 0) {
+ sb.append("Reduce: " + numReduce + " ");
+ }
+
+ if (cpuMSec > 0) {
+ sb.append(" Accumulative CPU: " + (cpuMSec / 1000D) + " sec ");
+ }
+
+ if (hdfsRead >= 0) {
+ sb.append(" HDFS Read: " + hdfsRead);
+ }
+
+ if (hdfsWrite >= 0) {
+ sb.append(" HDFS Write: " + hdfsWrite);
+ }
+
+ sb.append(" " + (success ? "SUCESS" : "FAIL"));
+
+ return sb.toString();
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1150945&r1=1150944&r2=1150945&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Mon Jul 25 22:44:42 2011
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
@@ -43,12 +44,13 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.Counters.Counter;
public class HadoopJobExecHelper {
-
+
protected transient JobConf job;
protected Task<? extends Serializable> task;
-
+
protected transient int mapProgress = 0;
protected transient int reduceProgress = 0;
public transient String jobId;
@@ -69,10 +71,10 @@ public class HadoopJobExecHelper {
return;
}
if(callBackObj != null) {
- callBackObj.updateCounters(ctrs, rj);
+ callBackObj.updateCounters(ctrs, rj);
}
}
-
+
/**
* This msg pattern is used to track when a job is started.
*
@@ -113,7 +115,7 @@ public class HadoopJobExecHelper {
return reduceProgress == 100;
}
-
+
public String getJobId() {
return jobId;
}
@@ -122,10 +124,10 @@ public class HadoopJobExecHelper {
this.jobId = jobId;
}
-
+
public HadoopJobExecHelper() {
}
-
+
public HadoopJobExecHelper(JobConf job, LogHelper console,
Task<? extends Serializable> task, HadoopJobExecHook hookCallBack) {
this.job = job;
@@ -134,7 +136,7 @@ public class HadoopJobExecHelper {
this.callBackObj = hookCallBack;
}
-
+
/**
* A list of the currently running jobs spawned in this Hive instance that is used to kill all
* running jobs in the event of an unexpected shutdown - i.e., the JVM shuts down while there are
@@ -143,7 +145,7 @@ public class HadoopJobExecHelper {
public static Map<String, String> runningJobKillURIs = Collections
.synchronizedMap(new HashMap<String, String>());
-
+
/**
* In Hive, when the user control-c's the command line, any running jobs spawned from that command
* line are best-effort killed.
@@ -200,12 +202,13 @@ public class HadoopJobExecHelper {
}
return this.callBackObj.checkFatalErrors(ctrs, errMsg);
}
-
- private boolean progress(ExecDriverTaskHandle th) throws IOException {
+
+ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException {
JobClient jc = th.getJobClient();
RunningJob rj = th.getRunningJob();
String lastReport = "";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ //DecimalFormat longFormatter = new DecimalFormat("###,###");
long reportTime = System.currentTimeMillis();
long maxReportInterval = 60 * 1000; // One minute
boolean fatal = false;
@@ -213,6 +216,10 @@ public class HadoopJobExecHelper {
long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL);
boolean initializing = true;
boolean initOutputPrinted = false;
+ long cpuMsec = -1;
+ int numMap = -1;
+ int numReduce = -1;
+
while (!rj.isComplete()) {
try {
Thread.sleep(pullInterval);
@@ -233,24 +240,24 @@ public class HadoopJobExecHelper {
String logMapper;
String logReducer;
-
+
TaskReport[] mappers = jc.getMapTaskReports(rj.getJobID());
if (mappers == null) {
logMapper = "no information for number of mappers; ";
} else {
- int numMap = mappers.length;
+ numMap = mappers.length;
if (ss != null) {
ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
Keys.TASK_NUM_MAPPERS, Integer.toString(numMap));
}
logMapper = "number of mappers: " + numMap + "; ";
}
-
+
TaskReport[] reducers = jc.getReduceTaskReports(rj.getJobID());
if (reducers == null) {
logReducer = "no information for number of reducers. ";
} else {
- int numReduce = reducers.length;
+ numReduce = reducers.length;
if (ss != null) {
ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
Keys.TASK_NUM_REDUCERS, Integer.toString(numReduce));
@@ -295,8 +302,22 @@ public class HadoopJobExecHelper {
String report = " " + getId() + " map = " + mapProgress + "%, reduce = " + reduceProgress
+ "%";
+
if (!report.equals(lastReport)
|| System.currentTimeMillis() >= reportTime + maxReportInterval) {
+ // find out CPU msecs
+ // In the case that we can't find out this number, we just skip the step to print
+ // it out.
+ Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+ "CPU_MILLISECONDS");
+ if (counterCpuMsec != null) {
+ long newCpuMSec = counterCpuMsec.getValue();
+ if (newCpuMSec > 0) {
+ cpuMsec = newCpuMSec;
+ report += ", Cumulative CPU "
+ + (cpuMsec / 1000D) + " sec";
+ }
+ }
// write out serialized plan with counters to log file
// LOG.info(queryPlan);
@@ -315,9 +336,14 @@ public class HadoopJobExecHelper {
}
}
+ if (cpuMsec > 0) {
+ console.printInfo("MapReduce Total cumulative CPU time: "
+ + Utilities.formatMsecToStr(cpuMsec));
+ }
+
boolean success;
- Counters ctrs = th.getCounters();
+ Counters ctrs = th.getCounters();
if (fatal) {
success = false;
} else {
@@ -331,6 +357,61 @@ public class HadoopJobExecHelper {
}
}
+ Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+ "CPU_MILLISECONDS");
+ if (counterCpuMsec != null) {
+ long newCpuMSec = counterCpuMsec.getValue();
+ if (newCpuMSec > cpuMsec) {
+ cpuMsec = newCpuMSec;
+ }
+ }
+
+ MapRedStats mapRedStats = new MapRedStats(numMap, numReduce, cpuMsec, success);
+
+ Counter ctr;
+
+ ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+ "REDUCE_SHUFFLE_BYTES");
+ if (ctr != null) {
+ mapRedStats.setReduceShuffleBytes(ctr.getValue());
+ }
+
+ ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+ "MAP_INPUT_RECORDS");
+ if (ctr != null) {
+ mapRedStats.setMapInputRecords(ctr.getValue());
+ }
+
+ ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+ "MAP_OUTPUT_RECORDS");
+ if (ctr != null) {
+ mapRedStats.setMapOutputRecords(ctr.getValue());
+ }
+
+ ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+ "REDUCE_INPUT_RECORDS");
+ if (ctr != null) {
+ mapRedStats.setReduceInputRecords(ctr.getValue());
+ }
+
+ ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+ "REDUCE_OUTPUT_RECORDS");
+ if (ctr != null) {
+ mapRedStats.setReduceOutputRecords(ctr.getValue());
+ }
+
+ ctr = ctrs.findCounter("FileSystemCounters",
+ "HDFS_BYTES_READ");
+ if (ctr != null) {
+ mapRedStats.setHdfsRead(ctr.getValue());
+ }
+
+ ctr = ctrs.findCounter("FileSystemCounters",
+ "HDFS_BYTES_WRITTEN");
+ if (ctr != null) {
+ mapRedStats.setHdfsWrite(ctr.getValue());
+ }
+
this.task.setDone();
// update based on the final value of the counters
updateCounters(ctrs, rj);
@@ -340,9 +421,9 @@ public class HadoopJobExecHelper {
this.callBackObj.logPlanProgress(ss);
}
// LOG.info(queryPlan);
- return (success);
+ return mapRedStats;
}
-
+
private String getId() {
return this.task.getId();
}
@@ -396,7 +477,7 @@ public class HadoopJobExecHelper {
return rj.getCounters();
}
}
-
+
// Used for showJobFailDebugInfo
private static class TaskInfo {
String jobId;
@@ -419,7 +500,7 @@ public class HadoopJobExecHelper {
return jobId;
}
}
-
+
@SuppressWarnings("deprecation")
private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
// Mapping from task ID to the number of failures
@@ -549,9 +630,9 @@ public class HadoopJobExecHelper {
public int progress(RunningJob rj, JobClient jc) throws IOException {
jobId = rj.getJobID();
-
+
int returnVal = 0;
-
+
// remove the pwd from conf file so that job tracker doesn't show this
// logs
String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD);
@@ -570,7 +651,14 @@ public class HadoopJobExecHelper {
ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
jobInfo(rj);
- boolean success = progress(th);
+ MapRedStats mapRedStats = progress(th);
+
+ // Not always there is a SessionState. Sometimes ExeDriver is directly invoked
+ // for special modes. In that case, SessionState.get() is empty.
+ if (SessionState.get() != null) {
+ SessionState.get().getLastMapRedStatsList().add(mapRedStats);
+ }
+ boolean success = mapRedStats.isSuccess();
String statusMesg = getJobEndMsg(rj.getJobID());
if (!success) {
@@ -583,7 +671,7 @@ public class HadoopJobExecHelper {
} else {
console.printInfo(statusMesg);
}
-
+
return returnVal;
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1150945&r1=1150944&r2=1150945&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Jul 25 22:44:42 2011
@@ -2257,4 +2257,44 @@ public final class Utilities {
}
return sb.toString();
}
+
+ /**
+ * Format number of milliseconds to strings
+ *
+ * @param msec milliseconds
+ * @return a formatted string like "x days y hours z minutes a seconds b msec"
+ */
+ public static String formatMsecToStr(long msec) {
+ long day = -1, hour = -1, minute = -1, second = -1;
+ long ms = msec % 1000;
+ long timeLeft = msec / 1000;
+ if (timeLeft > 0) {
+ second = timeLeft % 60;
+ timeLeft /= 60;
+ if (timeLeft > 0) {
+ minute = timeLeft % 60;
+ timeLeft /= 60;
+ if (timeLeft > 0) {
+ hour = timeLeft % 24;
+ day = timeLeft / 24;
+ }
+ }
+ }
+ StringBuilder sb = new StringBuilder();
+ if (day != -1) {
+ sb.append(day + " days ");
+ }
+ if (hour != -1) {
+ sb.append(hour + " hours ");
+ }
+ if (minute != -1) {
+ sb.append(minute + " minutes ");
+ }
+ if (second != -1) {
+ sb.append(second + " seconds ");
+ }
+ sb.append(ms + " msec");
+
+ return sb.toString();
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1150945&r1=1150944&r2=1150945&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Jul 25 22:44:42 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -107,6 +108,8 @@ public class SessionState {
private CreateTableAutomaticGrant createTableGrants;
+ private List<MapRedStats> lastMapRedStatsList;
+
/**
* Lineage state.
*/
@@ -641,4 +644,12 @@ public class SessionState {
public void setCreateTableGrants(CreateTableAutomaticGrant createTableGrants) {
this.createTableGrants = createTableGrants;
}
+
+ public List<MapRedStats> getLastMapRedStatsList() {
+ return lastMapRedStatsList;
+ }
+
+ public void setLastMapRedStatsList(List<MapRedStats> lastMapRedStatsList) {
+ this.lastMapRedStatsList = lastMapRedStatsList;
+ }
}