You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/09/03 19:30:38 UTC

svn commit: r992387 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java

Author: namit
Date: Fri Sep  3 17:30:38 2010
New Revision: 992387

URL: http://svn.apache.org/viewvc?rev=992387&view=rev
Log:
HIVE-1580. Cleanup ExecDriver.progress
(Joydeep Sen Sarma via namit)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=992387&r1=992386&r2=992387&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Sep  3 17:30:38 2010
@@ -120,6 +120,9 @@ Trunk -  Unreleased
     HIVE-1536. Add support for JDBC PreparedStatements
     (Sean Flatley via jvs)
 
+    HIVE-1580. Cleanup ExecDriver.progress
+    (Joydeep Sen Sarma via namit)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=992387&r1=992386&r2=992387&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Sep  3 17:30:38 2010
@@ -281,39 +281,31 @@ public class ExecDriver extends Task<Map
    * @return true if fatal errors happened during job execution, false
    *         otherwise.
    */
-  private boolean checkFatalErrors(ExecDriverTaskHandle th, StringBuilder errMsg) {
-    RunningJob rj = th.getRunningJob();
-    try {
-      Counters ctrs = th.getCounters();
-      if (ctrs == null) {
-        // hadoop might return null if it cannot locate the job.
-        // we may still be able to retrieve the job status - so ignore
-        return false;
-      }
-      // check for number of created files
-      long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
-      long upperLimit =  HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
-      if (numFiles > upperLimit) {
-        errMsg.append("total number of created files exceeds ").append(upperLimit);
-        return true;
-      }
+  private boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
+    if (ctrs == null) {
+      // hadoop might return null if it cannot locate the job.
+      // we may still be able to retrieve the job status - so ignore
+      return false;
+    }
+    // check for number of created files
+    long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
+    long upperLimit =  HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
+    if (numFiles > upperLimit) {
+      errMsg.append("total number of created files exceeds ").append(upperLimit);
+      return true;
+    }
 
-      for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
-        if (op.checkFatalErrors(ctrs, errMsg)) {
-          return true;
-        }
+    for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+      if (op.checkFatalErrors(ctrs, errMsg)) {
+        return true;
       }
-      if (work.getReducer() != null) {
-        if (work.getReducer().checkFatalErrors(ctrs, errMsg)) {
-          return true;
-        }
+    }
+    if (work.getReducer() != null) {
+      if (work.getReducer().checkFatalErrors(ctrs, errMsg)) {
+        return true;
       }
-      return false;
-    } catch (IOException e) {
-      // this exception can be tolerated
-      e.printStackTrace();
-      return false;
     }
+    return false;
   }
 
   private boolean progress(ExecDriverTaskHandle th) throws IOException {
@@ -354,6 +346,7 @@ public class ExecDriver extends Task<Map
         throw new IOException("Could not find status of job: + rj.getJobID()");
       } else {
         th.setRunningJob(newRj);
+        rj = newRj;
       }
 
       // If fatal errors happen we should kill the job immediately rather than
@@ -361,7 +354,10 @@ public class ExecDriver extends Task<Map
       if (fatal) {
         continue; // wait until rj.isComplete
       }
-      if (fatal = checkFatalErrors(th, errMsg)) {
+
+      Counters ctrs = th.getCounters();
+
+      if (fatal = checkFatalErrors(ctrs, errMsg)) {
         console.printError("[Fatal Error] " + errMsg.toString()
             + ". Killing the job.");
         rj.killJob();
@@ -369,7 +365,7 @@ public class ExecDriver extends Task<Map
       }
       errMsg.setLength(0);
 
-      updateCounters(th);
+      updateCounters(ctrs, rj);
 
       String report = " " + getId() + " map = " + mapProgress + "%,  reduce = "
           + reduceProgress + "%";
@@ -384,7 +380,7 @@ public class ExecDriver extends Task<Map
         SessionState ss = SessionState.get();
         if (ss != null) {
           ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(),
-              getId(), rj);
+              getId(), ctrs);
           ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
               getId(), Keys.TASK_HADOOP_PROGRESS, output);
           ss.getHiveHistory().progressTask(SessionState.get().getQueryId(),
@@ -398,12 +394,14 @@ public class ExecDriver extends Task<Map
     }
 
     boolean success;
+    Counters ctrs = th.getCounters();
+
     if (fatal) {
       success = false;
     } else {
       // check for fatal error again in case it occurred after
       // the last check before the job is completed
-      if (checkFatalErrors(th, errMsg)) {
+      if (checkFatalErrors(ctrs, errMsg)) {
         console.printError("[Fatal Error] " + errMsg.toString());
         success = false;
       } else  {
@@ -413,7 +411,7 @@ public class ExecDriver extends Task<Map
 
     setDone();
     // update based on the final value of the counters
-    updateCounters(th);
+    updateCounters(ctrs, rj);
 
     SessionState ss = SessionState.get();
     if (ss != null) {
@@ -426,15 +424,13 @@ public class ExecDriver extends Task<Map
   /**
    * Update counters relevant to this task.
    */
-  private void updateCounters(ExecDriverTaskHandle th) throws IOException {
-    RunningJob rj = th.getRunningJob();
+  private void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
     mapProgress = Math.round(rj.mapProgress() * 100);
     reduceProgress = Math.round(rj.reduceProgress() * 100);
     taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long
         .valueOf(mapProgress));
     taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long
         .valueOf(reduceProgress));
-    Counters ctrs = th.getCounters();
     if (ctrs == null) {
       // hadoop might return null if it cannot locate the job.
       // we may still be able to retrieve the job status - so ignore

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=992387&r1=992386&r2=992387&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Fri Sep  3 17:30:38 2010
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.session
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.Counters;
 
 /**
  * HiveHistory.
@@ -352,19 +353,19 @@ public class HiveHistory {
    * @param taskId
    * @param rj
    */
-  public void setTaskCounters(String queryId, String taskId, RunningJob rj) {
+  public void setTaskCounters(String queryId, String taskId, Counters ctrs) {
     String id = queryId + ":" + taskId;
     QueryInfo ji = queryInfoMap.get(queryId);
     StringBuilder sb1 = new StringBuilder("");
     TaskInfo ti = taskInfoMap.get(id);
-    if (ti == null) {
+    if ((ti == null) || (ctrs == null)) {
       return;
     }
     StringBuilder sb = new StringBuilder("");
     try {
 
       boolean first = true;
-      for (Group group : rj.getCounters()) {
+      for (Group group : ctrs) {
         for (Counter counter : group) {
           if (first) {
             first = false;
@@ -391,7 +392,7 @@ public class HiveHistory {
       }
 
     } catch (Exception e) {
-      e.printStackTrace();
+      LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
     }
     if (sb1.length() > 0) {
       taskInfoMap.get(id).hm.put(Keys.ROWS_INSERTED.name(), sb1.toString());