You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2010/08/03 01:41:28 UTC

svn commit: r981707 - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/test/queries/clientnegative/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientnegative/ ql/src/test/results/clientpositive/

Author: nzhang
Date: Mon Aug  2 23:41:28 2010
New Revision: 981707

URL: http://svn.apache.org/viewvc?rev=981707&view=rev
Log:
HIVE-1422 (2nd trial). skip counter update when RunningJob.getCounters() returns null (Joydeep Sen Sarma via Ning Zhang)

Added:
    hadoop/hive/trunk/ql/src/test/queries/clientnegative/fatal.q
    hadoop/hive/trunk/ql/src/test/results/clientnegative/fatal.q.out
Removed:
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/fatal.q
    hadoop/hive/trunk/ql/src/test/results/clientpositive/fatal.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=981707&r1=981706&r2=981707&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Aug  2 23:41:28 2010
@@ -108,6 +108,11 @@ Trunk -  Unreleased
     collect_set udaf
     (He Yongqiang via jvs)
 
+    HIVE-1422 (2nd trial). skip counter update when RunningJob.getCounters() 
+    returns null 
+    (Joydeep Sen Sarma via Ning Zhang)
+
+
   TESTS
 
     HIVE-1464. improve  test query performance

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=981707&r1=981706&r2=981707&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Mon Aug  2 23:41:28 2010
@@ -97,7 +97,6 @@ public class ExecDriver extends Task<Map
   protected transient JobConf job;
   protected transient int mapProgress = 0;
   protected transient int reduceProgress = 0;
-  protected transient boolean success = false; // if job execution is successful
 
   /**
    * Constructor when invoked from QL.
@@ -289,10 +288,12 @@ public class ExecDriver extends Task<Map
     RunningJob rj = th.getRunningJob();
     try {
       Counters ctrs = th.getCounters();
-      // HIVE-1422
       if (ctrs == null) {
+        // hadoop might return null if it cannot locate the job.
+        // we may still be able to retrieve the job status - so ignore
         return false;
       }
+
       for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
         if (op.checkFatalErrors(ctrs, errMsg)) {
           return true;
@@ -311,7 +312,7 @@ public class ExecDriver extends Task<Map
     }
   }
 
-  private void progress(ExecDriverTaskHandle th) throws IOException {
+  private boolean progress(ExecDriverTaskHandle th) throws IOException {
     JobClient jc = th.getJobClient();
     RunningJob rj = th.getRunningJob();
     String lastReport = "";
@@ -339,7 +340,17 @@ public class ExecDriver extends Task<Map
         // rj.getJobState() again and we do not want to do an extra RPC call
         initializing = false;
       }
-      th.setRunningJob(jc.getJob(rj.getJobID()));
+
+      RunningJob newRj = jc.getJob(rj.getJobID());
+      if (newRj == null) {
+        // under exceptional load, hadoop may not be able to look up status
+        // of finished jobs (because it has purged them from memory). From
+        // hive's perspective - it's equivalent to the job having failed.
+        // So raise a meaningful exception
+        throw new IOException("Could not find status of job: + rj.getJobID()");
+      } else {
+        th.setRunningJob(newRj);
+      }
 
       // If fatal errors happen we should kill the job immediately rather than
       // let the job retry several times, which eventually lead to failure.
@@ -347,7 +358,6 @@ public class ExecDriver extends Task<Map
         continue; // wait until rj.isComplete
       }
       if (fatal = checkFatalErrors(th, errMsg)) {
-        success = false;
         console.printError("[Fatal Error] " + errMsg.toString()
             + ". Killing the job.");
         rj.killJob();
@@ -382,23 +392,31 @@ public class ExecDriver extends Task<Map
         reportTime = System.currentTimeMillis();
       }
     }
-    // check for fatal error again in case it occurred after the last check
-    // before the job is completed
-    if (!fatal && (fatal = checkFatalErrors(th, errMsg))) {
-      console.printError("[Fatal Error] " + errMsg.toString());
+
+    boolean success;
+    if (fatal) {
       success = false;
     } else {
-      success = rj.isSuccessful();
+      // check for fatal error again in case it occurred after
+      // the last check before the job is completed
+      if (checkFatalErrors(th, errMsg)) {
+        console.printError("[Fatal Error] " + errMsg.toString());
+        success = false;
+      } else  {
+        success = rj.isSuccessful();
+      }
     }
 
     setDone();
-    th.setRunningJob(jc.getJob(rj.getJobID()));
+    // update based on the final value of the counters
     updateCounters(th);
+
     SessionState ss = SessionState.get();
     if (ss != null) {
       ss.getHiveHistory().logPlanProgress(queryPlan);
     }
     // LOG.info(queryPlan);
+    return (success);
   }
 
   /**
@@ -413,8 +431,9 @@ public class ExecDriver extends Task<Map
     taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long
         .valueOf(reduceProgress));
     Counters ctrs = th.getCounters();
-    // HIVE-1422
     if (ctrs == null) {
+      // hadoop might return null if it cannot locate the job.
+      // we may still be able to retrieve the job status - so ignore
       return;
     }
     for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
@@ -446,8 +465,7 @@ public class ExecDriver extends Task<Map
    */
   @Override
   public int execute(DriverContext driverContext) {
-
-    success = true;
+    boolean success = true;
 
     String invalidReason = work.isInvalid();
     if (invalidReason != null) {
@@ -554,7 +572,7 @@ public class ExecDriver extends Task<Map
     }
 
     int returnVal = 0;
-    RunningJob rj = null, orig_rj = null;
+    RunningJob rj = null;
 
     boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,
         HiveConf.ConfVars.HADOOPJOBNAME));
@@ -581,7 +599,7 @@ public class ExecDriver extends Task<Map
       // make this client wait if job trcker is not behaving well.
       Throttle.checkJobTracker(job, LOG);
 
-      orig_rj = rj = jc.submitJob(job);
+      rj = jc.submitJob(job);
       // replace it back
       if (pwd != null) {
         HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
@@ -593,14 +611,7 @@ public class ExecDriver extends Task<Map
 
       ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
       jobInfo(rj);
-      progress(th); // success status will be setup inside progress
-
-      if (rj == null) {
-        // in the corner case where the running job has disappeared from JT
-        // memory remember that we did actually submit the job.
-        rj = orig_rj;
-        success = false;
-      }
+      success = progress(th);
 
       String statusMesg = getJobEndMsg(rj.getJobID());
       if (!success) {
@@ -632,10 +643,12 @@ public class ExecDriver extends Task<Map
         if(ctxCreated)
           ctx.clear();
 
-        if (returnVal != 0 && rj != null) {
-          rj.killJob();
+        if (rj != null) {
+          if (returnVal != 0) {
+            rj.killJob();
+          }
+          runningJobKillURIs.remove(rj.getJobID());
         }
-        runningJobKillURIs.remove(rj.getJobID());
       } catch (Exception e) {
       }
     }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=981707&r1=981706&r2=981707&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Mon Aug  2 23:41:28 2010
@@ -348,30 +348,30 @@ public class MapRedTask extends ExecDriv
    * @return String null if job is eligible for local mode, reason otherwise
    */
   public static String isEligibleForLocalMode(HiveConf conf,
-                                               ContentSummary inputSummary,
-                                               int numReducers) {
+                                              ContentSummary inputSummary,
+                                              int numReducers) {
 
     long maxBytes = conf.getLongVar(HiveConf.ConfVars.LOCALMODEMAXBYTES);
     long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS);
 
     // check for max input size
     if (inputSummary.getLength() > maxBytes)
-	return "Input Size (= " + maxBytes + ") is larger than " +
-	    HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")";
+      return "Input Size (= " + inputSummary.getLength() + ") is larger than " +
+        HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")";
 
     // ideally we would like to do this check based on the number of splits
     // in the absence of an easy way to get the number of splits - do this
     // based on the total number of files (pessimistically assumming that
     // splits are equal to number of files in worst case)
     if (inputSummary.getFileCount() > maxTasks)
-	return "Number of Input Files (= " + inputSummary.getFileCount() +
-	    ") is larger than " + 
-	    HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")";
+      return "Number of Input Files (= " + inputSummary.getFileCount() +
+        ") is larger than " + 
+        HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")";
 
     // since local mode only runs with 1 reducers - make sure that the
     // the number of reducers (set by user or inferred) is <=1
     if (numReducers > 1) 
-	return "Number of reducers (= " + numReducers + ") is more than 1";
+      return "Number of reducers (= " + numReducers + ") is more than 1";
 
     return null;
   }

Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/fatal.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/fatal.q?rev=981707&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/fatal.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/fatal.q Mon Aug  2 23:41:28 2010
@@ -0,0 +1,4 @@
+set hive.mapjoin.maxsize=1;
+set hive.task.progress=true;
+
+select /*+ mapjoin(b) */ * from src a join src b on (a.key=b.key);

Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/fatal.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/fatal.q.out?rev=981707&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/fatal.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/fatal.q.out Mon Aug  2 23:41:28 2010
@@ -0,0 +1,5 @@
+PREHOOK: query: select /*+ mapjoin(b) */ * from src a join src b on (a.key=b.key)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/nzhang/hive_2010-08-02_13-41-52_752_1156521578782717030/-mr-10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask