You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2010/07/14 06:24:43 UTC

svn commit: r963943 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java

Author: nzhang
Date: Wed Jul 14 04:24:42 2010
New Revision: 963943

URL: http://svn.apache.org/viewvc?rev=963943&view=rev
Log:
HIVE-1462. Report progress in FileSinkOperator for multiple directory case (Siying Dong via Ning Zhang)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=963943&r1=963942&r2=963943&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Jul 14 04:24:42 2010
@@ -27,6 +27,10 @@ Trunk -  Unreleased
 
     HIVE-1447. Speed up reflection method calls (Zheng via He Yongqiang)
 
+    HIVE-1462. Report progress in FileSinkOperator works in multiple directory
+    case
+    (Siying Dong via Ning Zhang)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=963943&r1=963942&r2=963943&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Wed Jul 14 04:24:42 2010
@@ -49,10 +49,10 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -100,7 +100,6 @@ public class FileSinkOperator extends Te
     Path[] outPaths;
     Path[] finalPaths;
     RecordWriter[] outWriters;
-    int timeOut; // JT timeout in msec.
 
     public FSPaths() {
     }
@@ -110,10 +109,6 @@ public class FileSinkOperator extends Te
       outPaths   = new Path[numFiles];
       finalPaths = new Path[numFiles];
       outWriters = new RecordWriter[numFiles];
-      // Timeout is chosen to make sure that even if one iteration takes more than
-      // half of the script.timeout but less than script.timeout, we will still
-      // be able to report progress.
-      timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000)/2;
     }
 
     /**
@@ -164,31 +159,12 @@ public class FileSinkOperator extends Te
       return outWriters;
     }
 
-    /**
-     * Report status to JT so that JT won't kill this task if closing takes too long
-     * due to too many files to close and the NN is overloaded.
-     * @param lastUpdateTime the time (msec) that progress update happened.
-     * @return true if a new progress update is reported, false otherwise.
-     */
-    private boolean updateProgress(long lastUpdateTime) {
-      if (reporter != null &&
-          (System.currentTimeMillis() - lastUpdateTime) > timeOut) {
-        reporter.progress();
-        return true;
-      } else {
-        return false;
-      }
-    }
-
     public void closeWriters(boolean abort) throws HiveException {
-      long lastProgressReport = System.currentTimeMillis();
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
             outWriters[idx].close(abort);
-            if (updateProgress(lastProgressReport)) {
-              lastProgressReport = System.currentTimeMillis();
-            }
+            updateProgress();
           } catch (IOException e) {
             throw new HiveException(e);
           }
@@ -197,16 +173,13 @@ public class FileSinkOperator extends Te
     }
 
     private void commit(FileSystem fs) throws HiveException {
-      long lastProgressReport = System.currentTimeMillis();
       for (int idx = 0; idx < outPaths.length; ++idx) {
         try {
           if (!fs.rename(outPaths[idx], finalPaths[idx])) {
             throw new HiveException("Unable to rename output to: "
                 + finalPaths[idx]);
           }
-          if (updateProgress(lastProgressReport)) {
-            lastProgressReport = System.currentTimeMillis();
-          }
+          updateProgress();
         } catch (IOException e) {
           throw new HiveException(e + "Unable to rename output to: "
               + finalPaths[idx]);
@@ -215,7 +188,6 @@ public class FileSinkOperator extends Te
     }
 
     public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
-      long lastProgressReport = System.currentTimeMillis();
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
@@ -223,9 +195,7 @@ public class FileSinkOperator extends Te
           	if (delete) {
           	  fs.delete(outPaths[idx], true);
           	}
-          	if (updateProgress(lastProgressReport)) {
-          	  lastProgressReport = System.currentTimeMillis();
-          	}
+          	updateProgress();
           } catch (IOException e) {
             throw new HiveException(e);
           }
@@ -260,6 +230,8 @@ public class FileSinkOperator extends Te
   private transient FSPaths fsp;
   private transient boolean bDynParts;
   private transient SubStructObjectInspector subSetOI;
+  private transient int timeOut; // JT timeout in msec.
+  private transient long lastProgressReport = System.currentTimeMillis();
 
   /**
    * TableIdEnum.
@@ -314,6 +286,11 @@ public class FileSinkOperator extends Te
       serializer.initialize(null, conf.getTableInfo().getProperties());
       outputClass = serializer.getSerializedClass();
 
+      // Timeout is chosen to make sure that even if one iteration takes more than
+      // half of the script.timeout but less than script.timeout, we will still
+      // be able to report progress.
+      timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000)/2;
+
       if (hconf instanceof JobConf) {
         jc = (JobConf) hconf;
       } else {
@@ -504,6 +481,23 @@ public class FileSinkOperator extends Te
     filesCreated = true;
   }
 
+  /**
+   * Report status to JT so that JT won't kill this task if closing takes too long
+   * due to too many files to close and the NN is overloaded.
+   * @param lastUpdateTime the time (msec) that progress update happened.
+   * @return true if a new progress update is reported, false otherwise.
+   */
+  private boolean updateProgress() {
+    if (reporter != null &&
+        (System.currentTimeMillis() - lastProgressReport) > timeOut) {
+      reporter.progress();
+      lastProgressReport = System.currentTimeMillis();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   Writable recordValue;
 
   @Override
@@ -632,6 +626,7 @@ public class FileSinkOperator extends Te
       createBucketFiles(fsp);
     }
 
+    lastProgressReport = System.currentTimeMillis();
     if (!abort) {
       for (FSPaths fsp: valToPaths.values()) {
         fsp.closeWriters(abort);