You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/09/03 19:30:38 UTC
svn commit: r992387 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
Author: namit
Date: Fri Sep 3 17:30:38 2010
New Revision: 992387
URL: http://svn.apache.org/viewvc?rev=992387&view=rev
Log:
HIVE-1580. Cleanup ExecDriver.progress
(Joydeep Sen Sarma via namit)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=992387&r1=992386&r2=992387&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Sep 3 17:30:38 2010
@@ -120,6 +120,9 @@ Trunk - Unreleased
HIVE-1536. Add support for JDBC PreparedStatements
(Sean Flatley via jvs)
+ HIVE-1580. Cleanup ExecDriver.progress
+ (Joydeep Sen Sarma via namit)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=992387&r1=992386&r2=992387&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Sep 3 17:30:38 2010
@@ -281,39 +281,31 @@ public class ExecDriver extends Task<Map
* @return true if fatal errors happened during job execution, false
* otherwise.
*/
- private boolean checkFatalErrors(ExecDriverTaskHandle th, StringBuilder errMsg) {
- RunningJob rj = th.getRunningJob();
- try {
- Counters ctrs = th.getCounters();
- if (ctrs == null) {
- // hadoop might return null if it cannot locate the job.
- // we may still be able to retrieve the job status - so ignore
- return false;
- }
- // check for number of created files
- long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
- long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
- if (numFiles > upperLimit) {
- errMsg.append("total number of created files exceeds ").append(upperLimit);
- return true;
- }
+ private boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
+ if (ctrs == null) {
+ // hadoop might return null if it cannot locate the job.
+ // we may still be able to retrieve the job status - so ignore
+ return false;
+ }
+ // check for number of created files
+ long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
+ long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
+ if (numFiles > upperLimit) {
+ errMsg.append("total number of created files exceeds ").append(upperLimit);
+ return true;
+ }
- for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
- if (op.checkFatalErrors(ctrs, errMsg)) {
- return true;
- }
+ for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+ if (op.checkFatalErrors(ctrs, errMsg)) {
+ return true;
}
- if (work.getReducer() != null) {
- if (work.getReducer().checkFatalErrors(ctrs, errMsg)) {
- return true;
- }
+ }
+ if (work.getReducer() != null) {
+ if (work.getReducer().checkFatalErrors(ctrs, errMsg)) {
+ return true;
}
- return false;
- } catch (IOException e) {
- // this exception can be tolerated
- e.printStackTrace();
- return false;
}
+ return false;
}
private boolean progress(ExecDriverTaskHandle th) throws IOException {
@@ -354,6 +346,7 @@ public class ExecDriver extends Task<Map
throw new IOException("Could not find status of job: + rj.getJobID()");
} else {
th.setRunningJob(newRj);
+ rj = newRj;
}
// If fatal errors happen we should kill the job immediately rather than
@@ -361,7 +354,10 @@ public class ExecDriver extends Task<Map
if (fatal) {
continue; // wait until rj.isComplete
}
- if (fatal = checkFatalErrors(th, errMsg)) {
+
+ Counters ctrs = th.getCounters();
+
+ if (fatal = checkFatalErrors(ctrs, errMsg)) {
console.printError("[Fatal Error] " + errMsg.toString()
+ ". Killing the job.");
rj.killJob();
@@ -369,7 +365,7 @@ public class ExecDriver extends Task<Map
}
errMsg.setLength(0);
- updateCounters(th);
+ updateCounters(ctrs, rj);
String report = " " + getId() + " map = " + mapProgress + "%, reduce = "
+ reduceProgress + "%";
@@ -384,7 +380,7 @@ public class ExecDriver extends Task<Map
SessionState ss = SessionState.get();
if (ss != null) {
ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(),
- getId(), rj);
+ getId(), ctrs);
ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
getId(), Keys.TASK_HADOOP_PROGRESS, output);
ss.getHiveHistory().progressTask(SessionState.get().getQueryId(),
@@ -398,12 +394,14 @@ public class ExecDriver extends Task<Map
}
boolean success;
+ Counters ctrs = th.getCounters();
+
if (fatal) {
success = false;
} else {
// check for fatal error again in case it occurred after
// the last check before the job is completed
- if (checkFatalErrors(th, errMsg)) {
+ if (checkFatalErrors(ctrs, errMsg)) {
console.printError("[Fatal Error] " + errMsg.toString());
success = false;
} else {
@@ -413,7 +411,7 @@ public class ExecDriver extends Task<Map
setDone();
// update based on the final value of the counters
- updateCounters(th);
+ updateCounters(ctrs, rj);
SessionState ss = SessionState.get();
if (ss != null) {
@@ -426,15 +424,13 @@ public class ExecDriver extends Task<Map
/**
* Update counters relevant to this task.
*/
- private void updateCounters(ExecDriverTaskHandle th) throws IOException {
- RunningJob rj = th.getRunningJob();
+ private void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
mapProgress = Math.round(rj.mapProgress() * 100);
reduceProgress = Math.round(rj.reduceProgress() * 100);
taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long
.valueOf(mapProgress));
taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long
.valueOf(reduceProgress));
- Counters ctrs = th.getCounters();
if (ctrs == null) {
// hadoop might return null if it cannot locate the job.
// we may still be able to retrieve the job status - so ignore
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=992387&r1=992386&r2=992387&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Fri Sep 3 17:30:38 2010
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.Counters;
/**
* HiveHistory.
@@ -352,19 +353,19 @@ public class HiveHistory {
* @param taskId
* @param rj
*/
- public void setTaskCounters(String queryId, String taskId, RunningJob rj) {
+ public void setTaskCounters(String queryId, String taskId, Counters ctrs) {
String id = queryId + ":" + taskId;
QueryInfo ji = queryInfoMap.get(queryId);
StringBuilder sb1 = new StringBuilder("");
TaskInfo ti = taskInfoMap.get(id);
- if (ti == null) {
+ if ((ti == null) || (ctrs == null)) {
return;
}
StringBuilder sb = new StringBuilder("");
try {
boolean first = true;
- for (Group group : rj.getCounters()) {
+ for (Group group : ctrs) {
for (Counter counter : group) {
if (first) {
first = false;
@@ -391,7 +392,7 @@ public class HiveHistory {
}
} catch (Exception e) {
- e.printStackTrace();
+ LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
}
if (sb1.length() > 0) {
taskInfoMap.get(id).hm.put(Keys.ROWS_INSERTED.name(), sb1.toString());