You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2010/08/03 01:41:28 UTC
svn commit: r981707 - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/test/queries/clientnegative/ ql/src/test/queries/clientpositive/
ql/src/test/results/clientnegative/ ql/src/test/results/clientpositive/
Author: nzhang
Date: Mon Aug 2 23:41:28 2010
New Revision: 981707
URL: http://svn.apache.org/viewvc?rev=981707&view=rev
Log:
HIVE-1422 (2nd trial). skip counter update when RunningJob.getCounters() returns null (Joydeep Sen Sarma via Ning Zhang)
Added:
hadoop/hive/trunk/ql/src/test/queries/clientnegative/fatal.q
hadoop/hive/trunk/ql/src/test/results/clientnegative/fatal.q.out
Removed:
hadoop/hive/trunk/ql/src/test/queries/clientpositive/fatal.q
hadoop/hive/trunk/ql/src/test/results/clientpositive/fatal.q.out
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=981707&r1=981706&r2=981707&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Aug 2 23:41:28 2010
@@ -108,6 +108,11 @@ Trunk - Unreleased
collect_set udaf
(He Yongqiang via jvs)
+ HIVE-1422 (2nd trial). skip counter update when RunningJob.getCounters()
+ returns null
+ (Joydeep Sen Sarma via Ning Zhang)
+
+
TESTS
HIVE-1464. improve test query performance
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=981707&r1=981706&r2=981707&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Mon Aug 2 23:41:28 2010
@@ -97,7 +97,6 @@ public class ExecDriver extends Task<Map
protected transient JobConf job;
protected transient int mapProgress = 0;
protected transient int reduceProgress = 0;
- protected transient boolean success = false; // if job execution is successful
/**
* Constructor when invoked from QL.
@@ -289,10 +288,12 @@ public class ExecDriver extends Task<Map
RunningJob rj = th.getRunningJob();
try {
Counters ctrs = th.getCounters();
- // HIVE-1422
if (ctrs == null) {
+ // hadoop might return null if it cannot locate the job.
+ // we may still be able to retrieve the job status - so ignore
return false;
}
+
for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
if (op.checkFatalErrors(ctrs, errMsg)) {
return true;
@@ -311,7 +312,7 @@ public class ExecDriver extends Task<Map
}
}
- private void progress(ExecDriverTaskHandle th) throws IOException {
+ private boolean progress(ExecDriverTaskHandle th) throws IOException {
JobClient jc = th.getJobClient();
RunningJob rj = th.getRunningJob();
String lastReport = "";
@@ -339,7 +340,17 @@ public class ExecDriver extends Task<Map
// rj.getJobState() again and we do not want to do an extra RPC call
initializing = false;
}
- th.setRunningJob(jc.getJob(rj.getJobID()));
+
+ RunningJob newRj = jc.getJob(rj.getJobID());
+ if (newRj == null) {
+ // under exceptional load, hadoop may not be able to look up status
+ // of finished jobs (because it has purged them from memory). From
+ // hive's perspective - it's equivalent to the job having failed.
+ // So raise a meaningful exception
+ throw new IOException("Could not find status of job: + rj.getJobID()");
+ } else {
+ th.setRunningJob(newRj);
+ }
// If fatal errors happen we should kill the job immediately rather than
// let the job retry several times, which eventually lead to failure.
@@ -347,7 +358,6 @@ public class ExecDriver extends Task<Map
continue; // wait until rj.isComplete
}
if (fatal = checkFatalErrors(th, errMsg)) {
- success = false;
console.printError("[Fatal Error] " + errMsg.toString()
+ ". Killing the job.");
rj.killJob();
@@ -382,23 +392,31 @@ public class ExecDriver extends Task<Map
reportTime = System.currentTimeMillis();
}
}
- // check for fatal error again in case it occurred after the last check
- // before the job is completed
- if (!fatal && (fatal = checkFatalErrors(th, errMsg))) {
- console.printError("[Fatal Error] " + errMsg.toString());
+
+ boolean success;
+ if (fatal) {
success = false;
} else {
- success = rj.isSuccessful();
+ // check for fatal error again in case it occurred after
+ // the last check before the job is completed
+ if (checkFatalErrors(th, errMsg)) {
+ console.printError("[Fatal Error] " + errMsg.toString());
+ success = false;
+ } else {
+ success = rj.isSuccessful();
+ }
}
setDone();
- th.setRunningJob(jc.getJob(rj.getJobID()));
+ // update based on the final value of the counters
updateCounters(th);
+
SessionState ss = SessionState.get();
if (ss != null) {
ss.getHiveHistory().logPlanProgress(queryPlan);
}
// LOG.info(queryPlan);
+ return (success);
}
/**
@@ -413,8 +431,9 @@ public class ExecDriver extends Task<Map
taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long
.valueOf(reduceProgress));
Counters ctrs = th.getCounters();
- // HIVE-1422
if (ctrs == null) {
+ // hadoop might return null if it cannot locate the job.
+ // we may still be able to retrieve the job status - so ignore
return;
}
for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
@@ -446,8 +465,7 @@ public class ExecDriver extends Task<Map
*/
@Override
public int execute(DriverContext driverContext) {
-
- success = true;
+ boolean success = true;
String invalidReason = work.isInvalid();
if (invalidReason != null) {
@@ -554,7 +572,7 @@ public class ExecDriver extends Task<Map
}
int returnVal = 0;
- RunningJob rj = null, orig_rj = null;
+ RunningJob rj = null;
boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,
HiveConf.ConfVars.HADOOPJOBNAME));
@@ -581,7 +599,7 @@ public class ExecDriver extends Task<Map
// make this client wait if job trcker is not behaving well.
Throttle.checkJobTracker(job, LOG);
- orig_rj = rj = jc.submitJob(job);
+ rj = jc.submitJob(job);
// replace it back
if (pwd != null) {
HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
@@ -593,14 +611,7 @@ public class ExecDriver extends Task<Map
ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
jobInfo(rj);
- progress(th); // success status will be setup inside progress
-
- if (rj == null) {
- // in the corner case where the running job has disappeared from JT
- // memory remember that we did actually submit the job.
- rj = orig_rj;
- success = false;
- }
+ success = progress(th);
String statusMesg = getJobEndMsg(rj.getJobID());
if (!success) {
@@ -632,10 +643,12 @@ public class ExecDriver extends Task<Map
if(ctxCreated)
ctx.clear();
- if (returnVal != 0 && rj != null) {
- rj.killJob();
+ if (rj != null) {
+ if (returnVal != 0) {
+ rj.killJob();
+ }
+ runningJobKillURIs.remove(rj.getJobID());
}
- runningJobKillURIs.remove(rj.getJobID());
} catch (Exception e) {
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=981707&r1=981706&r2=981707&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Mon Aug 2 23:41:28 2010
@@ -348,30 +348,30 @@ public class MapRedTask extends ExecDriv
* @return String null if job is eligible for local mode, reason otherwise
*/
public static String isEligibleForLocalMode(HiveConf conf,
- ContentSummary inputSummary,
- int numReducers) {
+ ContentSummary inputSummary,
+ int numReducers) {
long maxBytes = conf.getLongVar(HiveConf.ConfVars.LOCALMODEMAXBYTES);
long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS);
// check for max input size
if (inputSummary.getLength() > maxBytes)
- return "Input Size (= " + maxBytes + ") is larger than " +
- HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")";
+ return "Input Size (= " + inputSummary.getLength() + ") is larger than " +
+ HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")";
// ideally we would like to do this check based on the number of splits
// in the absence of an easy way to get the number of splits - do this
// based on the total number of files (pessimistically assumming that
// splits are equal to number of files in worst case)
if (inputSummary.getFileCount() > maxTasks)
- return "Number of Input Files (= " + inputSummary.getFileCount() +
- ") is larger than " +
- HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")";
+ return "Number of Input Files (= " + inputSummary.getFileCount() +
+ ") is larger than " +
+ HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")";
// since local mode only runs with 1 reducers - make sure that the
// the number of reducers (set by user or inferred) is <=1
if (numReducers > 1)
- return "Number of reducers (= " + numReducers + ") is more than 1";
+ return "Number of reducers (= " + numReducers + ") is more than 1";
return null;
}
Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/fatal.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/fatal.q?rev=981707&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/fatal.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/fatal.q Mon Aug 2 23:41:28 2010
@@ -0,0 +1,4 @@
+set hive.mapjoin.maxsize=1;
+set hive.task.progress=true;
+
+select /*+ mapjoin(b) */ * from src a join src b on (a.key=b.key);
Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/fatal.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/fatal.q.out?rev=981707&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/fatal.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/fatal.q.out Mon Aug 2 23:41:28 2010
@@ -0,0 +1,5 @@
+PREHOOK: query: select /*+ mapjoin(b) */ * from src a join src b on (a.key=b.key)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/nzhang/hive_2010-08-02_13-41-52_752_1156521578782717030/-mr-10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask