You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2009/06/29 22:43:04 UTC

svn commit: r789440 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java

Author: namit
Date: Mon Jun 29 20:43:04 2009
New Revision: 789440

URL: http://svn.apache.org/viewvc?rev=789440&view=rev
Log:
HIVE-587. Duplicate results from multiple TIP
(Zheng Shao via namit)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=789440&r1=789439&r2=789440&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Jun 29 20:43:04 2009
@@ -279,6 +279,9 @@
     HIVE-583. Fix spurious 'default database not found'.
     (Prasad Chakka via zshao)
 
+    HIVE-587. Duplicate results from multiple TIP
+    (Zheng Shao via namit)
+
 Release 0.3.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=789440&r1=789439&r2=789440&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Mon Jun 29 20:43:04 2009
@@ -206,16 +206,20 @@
         String specPath = conf.getDirName();
         fs = (new Path(specPath)).getFileSystem(hconf);
         Path tmpPath = Utilities.toTempPath(specPath);
+        Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() + ".intermediate");
         Path finalPath = new Path(specPath);
         if(success) {
           if(fs.exists(tmpPath)) {
-            // Step1: rename tmp output folder to final path. After this point, 
-            // updates from speculative tasks still writing to tmpPath will not 
-            // appear in finalPath
-            LOG.info("Moving tmp dir: " + tmpPath + " to: " + finalPath);
-            renameOrMoveFiles(fs, tmpPath, finalPath);
-            // Step2: Clean any temp files from finalPath
-            Utilities.removeTempFiles(fs, finalPath);
+            // Step1: rename tmp output folder to intermediate path. After this
+            // point, updates from speculative tasks still writing to tmpPath 
+            // will not appear in finalPath.
+            LOG.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
+            Utilities.rename(fs, tmpPath, intermediatePath);
+            // Step2: remove any tmp file or double-committed output files
+            Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
+            // Step3: move to the file destination
+            LOG.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
+            Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
           }
         } else {
           fs.delete(tmpPath);
@@ -227,39 +231,4 @@
     super.jobClose(hconf, success);
   }
   
-  /**
-   * Rename src to dst, or in the case dst already exists, move files in src 
-   * to dst.  If there is an existing file with the same name, the new file's 
-   * name will be appended with "_1", "_2", etc.
-   * @param fs the FileSystem where src and dst are on.  
-   * @param src the src directory
-   * @param dst the target directory
-   * @throws IOException 
-   */
-  static public void renameOrMoveFiles(FileSystem fs, Path src, Path dst)
-    throws IOException, HiveException {
-    if (!fs.exists(dst)) {
-      if (!fs.rename(src, dst)) {
-        throw new HiveException ("Unable to move: " + src + " to: " + dst);
-      }
-    } else {
-      // move file by file
-      FileStatus[] files = fs.listStatus(src);
-      for (int i=0; i<files.length; i++) {
-        Path srcFilePath = files[i].getPath();
-        String fileName = srcFilePath.getName();
-        Path dstFilePath = new Path(dst, fileName);
-        if (fs.exists(dstFilePath)) {
-          int suffix = 0;
-          do {
-            suffix++;
-            dstFilePath = new Path(dst, fileName + "_" + suffix);
-          } while (fs.exists(dstFilePath));
-        }
-        if (!fs.rename(srcFilePath, dstFilePath)) {
-          throw new HiveException ("Unable to move: " + src + " to: " + dst);
-        }
-      }
-    }
-  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=789440&r1=789439&r2=789440&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Jun 29 20:43:04 2009
@@ -267,6 +267,9 @@
 
     boolean new_abort = abort;
     if(!abort) {
+      if(scriptError != null) {
+        throw new HiveException(scriptError);
+      }
       // everything ok. try normal shutdown
       try {
         scriptOut.flush();

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=789440&r1=789439&r2=789440&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Jun 29 20:43:04 2009
@@ -24,6 +24,8 @@
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.beans.*;
 
 import org.apache.commons.lang.StringUtils;
@@ -50,22 +52,9 @@
 import org.apache.hadoop.hive.ql.plan.*;
 import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -82,7 +71,7 @@
 
   public static enum ReduceField { KEY, VALUE, ALIAS };
   private static volatile mapredWork gWork = null;
-  static final private Log LOG = LogFactory.getLog("hive.ql.exec.Utilities");
+  static final private Log LOG = LogFactory.getLog(Utilities.class.getName());
 
   public static void clearMapRedWork (Configuration job) {
     try {
@@ -619,9 +608,81 @@
   }
 
   /**
-   * Remove all temporary files from a given directory
+   * Rename src to dst, or in the case dst already exists, move files in src 
+   * to dst.  If there is an existing file with the same name, the new file's 
+   * name will be appended with "_1", "_2", etc.
+   * @param fs the FileSystem where src and dst are on.  
+   * @param src the src directory
+   * @param dst the target directory
+   * @throws IOException 
+   */
+  static public void rename(FileSystem fs, Path src, Path dst)
+    throws IOException, HiveException {
+    if (!fs.rename(src, dst)) {
+      throw new HiveException ("Unable to move: " + src + " to: " + dst);
+    }
+  }  
+  /**
+   * Rename src to dst, or in the case dst already exists, move files in src 
+   * to dst.  If there is an existing file with the same name, the new file's 
+   * name will be appended with "_1", "_2", etc.
+   * @param fs the FileSystem where src and dst are on.  
+   * @param src the src directory
+   * @param dst the target directory
+   * @throws IOException 
+   */
+  static public void renameOrMoveFiles(FileSystem fs, Path src, Path dst)
+    throws IOException, HiveException {
+    if (!fs.exists(dst)) {
+      if (!fs.rename(src, dst)) {
+        throw new HiveException ("Unable to move: " + src + " to: " + dst);
+      }
+    } else {
+      // move file by file
+      FileStatus[] files = fs.listStatus(src);
+      for (int i=0; i<files.length; i++) {
+        Path srcFilePath = files[i].getPath();
+        String fileName = srcFilePath.getName();
+        Path dstFilePath = new Path(dst, fileName);
+        if (fs.exists(dstFilePath)) {
+          int suffix = 0;
+          do {
+            suffix++;
+            dstFilePath = new Path(dst, fileName + "_" + suffix);
+          } while (fs.exists(dstFilePath));
+        }
+        if (!fs.rename(srcFilePath, dstFilePath)) {
+          throw new HiveException ("Unable to move: " + src + " to: " + dst);
+        }
+      }
+    }
+  }
+  
+  /** The first group will contain the task id.
+   *  The second group is the optional extension.
+   *  The file name looks like: "24931_r_000000_0" or "24931_r_000000_0.gz"
+   */
+  static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$");
+  
+  /**
+   * Get the task id from the filename.
+   * E.g., get "000000" out of "24931_r_000000_0" or "24931_r_000000_0.gz"
+   */
+  public static String getTaskIdFromFilename(String filename) {
+    Matcher m = fileNameTaskIdRegex.matcher(filename);
+    if (!m.matches()) {
+      LOG.warn("Unable to get task id from file name: " + filename + ". Using full filename as task id.");
+      return filename;
+    } else {
+      String taskId = m.group(1);
+      LOG.debug("TaskId for " + filename + " = " + taskId);
+      return taskId;
+    }
+  }
+  /**
+   * Remove all temporary files and duplicate (double-committed) files from a given directory.
    */
-  public static void removeTempFiles(FileSystem fs, Path path) throws IOException {
+  public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException {
     if(path == null)
       return;
 
@@ -629,12 +690,26 @@
     if(items == null)
       return;
 
+    HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>();
     for(FileStatus one: items) {
       if(isTempPath(one)) {
         if(!fs.delete(one.getPath(), true)) {
           throw new IOException ("Unable to delete tmp file: " + one.getPath());
         }
       }
+      String taskId = getTaskIdFromFilename(one.getPath().getName());
+      FileStatus otherFile = taskIdToFile.get(taskId);
+      if (otherFile == null) {
+        taskIdToFile.put(taskId, one);
+      } else {
+        if(!fs.delete(one.getPath(), true)) {
+          throw new IOException ("Unable to delete duplicate file: "
+              + one.getPath() + ". Existing file: " + otherFile.getPath());
+        } else {
+          LOG.warn("Duplicate taskid file removed: " + one.getPath() 
+              + ". Existing file: " + otherFile.getPath());
+        }
+      }
     }
   }