You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/02/19 03:36:26 UTC

svn commit: r745709 - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/metadata/

Author: zshao
Date: Thu Feb 19 02:36:25 2009
New Revision: 745709

URL: http://svn.apache.org/viewvc?rev=745709&view=rev
Log:
HIVE-131. Remove uncommitted files from failed tasks. (Joydeep Sen Sarma via zshao)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Feb 19 02:36:25 2009
@@ -127,6 +127,9 @@
 
   BUG FIXES
 
+    HIVE-131. Remove uncommitted files from failed tasks.
+    (Joydeep Sen Sarma via zshao)
+
     HIVE-264. TBinarySortable Protocol supports null characters. (zshao)
 
     HIVE-255. Propagate user name to Hive metastore. (Prasad Chakka via zshao)

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Feb 19 02:36:25 2009
@@ -303,7 +303,6 @@
     }
     return r;
   }
-  
 
   /**
    * Execute a query plan using Hadoop
@@ -313,8 +312,10 @@
     try {
       setNumberOfReducers();
     } catch(IOException e) {
-      String statusMesg = "IOException occurred while acceesing HDFS to estimate the number of reducers.";
-      console.printError(statusMesg);
+      String statusMesg = "IOException while accessing HDFS to estimate the number of reducers: "
+        + e.getMessage();
+      console.printError(statusMesg, "\n"
+                         + org.apache.hadoop.util.StringUtils.stringifyException(e));
       return 1;
     }
 
@@ -355,7 +356,8 @@
 
     int returnVal = 0;
     FileSystem fs = null;
-    RunningJob rj = null;
+    RunningJob rj = null, orig_rj = null;
+    boolean success = false;
 
     try {
       fs = FileSystem.get(job);
@@ -386,7 +388,7 @@
       // make this client wait if job trcker is not behaving well.
       Throttle.checkJobTracker(job, LOG);
 
-      rj = jc.submitJob(job);
+      orig_rj = rj = jc.submitJob(job);
 
       // add to list of running jobs so in case of abnormal shutdown can kill
       // it.
@@ -396,8 +398,17 @@
       jobInfo(rj);
       rj = jobProgress(jc, rj);
 
+      if(rj == null) {
+        // in the corner case where the running job has disappeared from JT memory
+        // remember that we did actually submit the job.
+        rj = orig_rj;
+        success = false;
+      } else {
+        success = rj.isSuccessful();
+      }
+
       String statusMesg = "Ended Job = " + rj.getJobID();
-      if (!rj.isSuccessful()) {
+      if (!success) {
         statusMesg += " with errors";
         returnVal = 2;
         console.printError(statusMesg);
@@ -405,7 +416,7 @@
         console.printInfo(statusMesg);
       }
     } catch (Exception e) {
-      String mesg = " with exception '" + e.getMessage() + "'";
+      String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
       if (rj != null) {
         mesg = "Ended Job = " + rj.getJobID() + mesg;
       } else {
@@ -416,6 +427,7 @@
       console.printError(mesg, "\n"
           + org.apache.hadoop.util.StringUtils.stringifyException(e));
 
+      success = false;
       returnVal = 1;
     } finally {
       Utilities.clearMapRedWork(job);
@@ -428,6 +440,30 @@
       } catch (Exception e) {
       }
     }
+
+    try {
+      if (rj != null) {
+        if(work.getAliasToWork() != null) {
+          for(Operator<? extends Serializable> op:
+                work.getAliasToWork().values()) {
+            op.jobClose(job, success);
+          }
+        }
+        if(work.getReducer() != null) {
+          work.getReducer().jobClose(job, success);
+        }
+      }
+    } catch (Exception e) {
+      // jobClose needs to execute successfully otherwise fail task
+      if(success) {
+        success = false;
+        returnVal = 3;
+        String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'";
+        console.printError(mesg, "\n"
+                           + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      }
+    }
+
     return (returnVal);
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Feb 19 02:36:25 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.*;
+import java.lang.reflect.Method;
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
@@ -50,9 +51,13 @@
   transient protected Path finalPath;
   transient protected Serializer serializer;
   transient protected BytesWritable commonKey = new BytesWritable();
-  
+  transient protected boolean autoDelete = false;
+
   private void commit() throws IOException {
-    fs.rename(outPath, finalPath);
+    if(!fs.rename(outPath, finalPath)) {
+      throw new IOException ("Unable to rename output to: " + finalPath);
+    }
+    LOG.info("Committed to output file: " + finalPath);
   }
 
   public void close(boolean abort) throws HiveException {
@@ -68,7 +73,8 @@
     } else {
       try {
         outWriter.close(abort);
-        fs.delete(outPath, true);
+        if(!autoDelete)
+          fs.delete(outPath, true);
       } catch (Exception e) {
         e.printStackTrace();
       }
@@ -91,8 +97,11 @@
       }
 
       fs = FileSystem.get(hconf);
-      finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf));
-      outPath = new Path(conf.getDirName(), "_tmp."+Utilities.getTaskId(hconf));
+      finalPath = new Path(Utilities.toTempPath(conf.getDirName()), Utilities.getTaskId(hconf));
+      outPath = new Path(Utilities.toTempPath(conf.getDirName()), Utilities.toTempPath(Utilities.getTaskId(hconf)));
+
+      LOG.info("Writing to temp file: " + outPath);
+
       OutputFormat<?, ?> outputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
       final Class<? extends Writable> outputClass = serializer.getSerializedClass();
       boolean isCompressed = conf.getCompressed();
@@ -100,7 +109,7 @@
       // The reason to keep these instead of using OutputFormat.getRecordWriter() is that
       // getRecordWriter does not give us enough control over the file name that we create.
       if(outputFormat instanceof IgnoreKeyTextOutputFormat) {
-        finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf) +
+        finalPath = new Path(Utilities.toTempPath(conf.getDirName()), Utilities.getTaskId(hconf) +
                              Utilities.getFileExtension(jc, isCompressed));
 
         String rowSeparatorString = conf.getTableInfo().getProperties().getProperty(Constants.LINE_DELIM, "\n");
@@ -145,6 +154,15 @@
         // should never come here - we should be catching this in ddl command
         throw new HiveException ("Illegal outputformat: " + outputFormat.getClass().getName());
       }
+
+      // in recent hadoop versions, use deleteOnExit to clean tmp files.
+      try {
+        Method deleteOnExit = FileSystem.class.getDeclaredMethod("deleteOnExit", new Class [] {Path.class});
+        deleteOnExit.setAccessible(true);
+        deleteOnExit.invoke(fs, outPath);
+        autoDelete = true;
+      } catch (Exception e) {}
+
     } catch (HiveException e) {
       throw e;
     } catch (Exception e) {
@@ -173,4 +191,33 @@
   public String getName() {
     return new String("FS");
   }
+
+  @Override
+  public void jobClose(Configuration hconf, boolean success) throws HiveException { 
+    try {
+      if(conf != null) {
+        fs = FileSystem.get(hconf);
+        Path tmpPath = Utilities.toTempPath(conf.getDirName());
+        Path finalPath = new Path(conf.getDirName());
+        if(success) {
+          if(fs.exists(tmpPath)) {
+            // Step1: rename tmp output folder to final path. After this point, 
+            // updates from speculative tasks still writing to tmpPath will not 
+            // appear in finalPath
+            LOG.info("Renaming tmp dir: " + tmpPath + " to: " + finalPath);
+            if(!fs.rename(tmpPath, finalPath)) {
+              throw new HiveException("Unable to commit result directory: " + finalPath);
+            }
+            // Step2: Clean any temp files from finalPath
+            Utilities.removeTempFiles(fs, finalPath);
+          }
+        } else {
+          fs.delete(tmpPath);
+        }
+      }
+    } catch (IOException e) {
+      throw new HiveException (e);
+    }
+    super.jobClose(hconf, success);
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Thu Feb 19 02:36:25 2009
@@ -45,20 +45,6 @@
 
   private static final long serialVersionUID = 1L;
 
-  private void cleanseSource(FileSystem fs, Path sourcePath) throws IOException {
-    if(sourcePath == null)
-      return;
-
-    FileStatus [] srcs = fs.globStatus(sourcePath);
-    if(srcs != null) {
-      for(FileStatus one: srcs) {
-        if(Hive.needsDeletion(one)) {
-          fs.delete(one.getPath(), true);
-        }
-      }
-    }
-  }
-
   public int execute() {
 
     try {
@@ -70,7 +56,6 @@
       for(loadFileDesc lfd: work.getLoadFileWork()) {
         Path targetPath = new Path(lfd.getTargetDir());
         Path sourcePath = new Path(lfd.getSourceDir());
-        cleanseSource(fs, sourcePath);
         if (lfd.getIsDfsDir()) {
           // Just do a rename on the URIs
           String mesg = "Moving data to: " + lfd.getTargetDir();
@@ -95,11 +80,11 @@
 
           if(dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
             console.printInfo(mesg, mesg_detail);
-          // if source exists, rename. Otherwise, create a empty directory
-          if (fs.exists(sourcePath))
-            fs.copyToLocalFile(sourcePath, targetPath);
-          else
-            dstFs.mkdirs(targetPath);
+            // if source exists, rename. Otherwise, create a empty directory
+            if (fs.exists(sourcePath))
+              fs.copyToLocalFile(sourcePath, targetPath);
+            else
+              dstFs.mkdirs(targetPath);
           } else {
             console.printInfo("Unable to delete the existing destination directory: " + targetPath);
           }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Feb 19 02:36:25 2009
@@ -266,6 +266,22 @@
     }
   }
 
+  /**
+   * Unlike other operator interfaces which are called from map or reduce task,
+   * jobClose is called from the jobclient side once the job has completed
+   *
+   * @param conf Configuration with with which job was submitted
+   * @param succes whether the job was completed successfully or not
+   */
+  public void jobClose(Configuration conf, boolean success) throws HiveException {
+    if(childOperators == null)
+      return;
+    
+    for(Operator<? extends Serializable> op: childOperators) {
+      op.jobClose(conf, success);
+    }
+  }
+
   protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
     
     if((childOperators == null) || (getDone())) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Feb 19 02:36:25 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -540,4 +541,54 @@
 
     return src;
   }
+
+  private static final String tmpPrefix = "_tmp.";
+
+  public static Path toTempPath(Path orig) {
+    if(orig.getName().indexOf(tmpPrefix) == 0)
+      return orig;
+    return new Path(orig.getParent(), tmpPrefix + orig.getName());
+  }
+
+  /**
+   * Given a path, convert to a temporary path
+   */
+  public static Path toTempPath(String orig) {
+    return toTempPath(new Path(orig));
+  }
+  
+  /**
+   * Detect if the supplied file is a temporary path
+   */
+  public static boolean isTempPath(FileStatus file) {
+    String name = file.getPath().getName();
+    // in addition to detecting hive temporary files, we also check hadoop
+    // temporary folders that used to show up in older releases
+    return (name.startsWith("_task") || name.startsWith(tmpPrefix));
+  }
+
+  /**
+   * Remove all temporary files from a given directory
+   */
+  public static void removeTempFiles(FileSystem fs, Path path) throws IOException {
+    if(path == null)
+      return;
+
+    FileStatus items[] = fs.listStatus(path);
+    if(items == null)
+      return;
+
+    for(FileStatus one: items) {
+      if(isTempPath(one)) {
+        if(!fs.delete(one.getPath(), true)) {
+          throw new IOException ("Unable to delete tmp file: " + one.getPath());
+        }
+      }
+    }
+  }
+
+  public static String getNameMessage(Exception e) {
+    return e.getClass().getName() + "(" +  e.getMessage() + ")";
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Thu Feb 19 02:36:25 2009
@@ -53,6 +53,7 @@
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 
 import com.facebook.thrift.TException;
 import com.facebook.thrift.protocol.TBinaryProtocol;
@@ -660,26 +661,15 @@
     }
   }
 
-  public static boolean needsDeletion(FileStatus file) {
-    String name = file.getPath().getName();
-    // There is a race condition in hadoop as a result of which
-    // the _task files created in the output directory at the time
-    // of the mapper is reported in the output directory even though
-    // it is actually removed. The first check works around that
-    // NOTE: it's not clear that this still affects recent versions of hadoop
-
-    // the second check deals with uncommitted output files produced by hive tasks
-    // this would typically happen on task failures or due to speculation
-    return (name.startsWith("_task") || name.startsWith("_tmp."));
-  }
-
   private void checkPaths(FileSystem fs, FileStatus [] srcs, Path destf, boolean replace) throws HiveException {
     try {
         for(int i=0; i<srcs.length; i++) {
             FileStatus [] items = fs.listStatus(srcs[i].getPath());
             for(int j=0; j<items.length; j++) {
 
-                if (needsDeletion(items[j])) {
+                if (Utilities.isTempPath(items[j])) {
+                      // This check is redundant because temp files are removed by execution layer before
+                      // calling loadTable/Partition. But leaving it in just in case.
                       fs.delete(items[j].getPath(), true);
                       continue;
                 }
@@ -809,6 +799,4 @@
       throw new HiveException("Error in getting fields from serde." + e.getMessage(), e);
     }
   }
-
-  
 };