You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/09/09 19:36:24 UTC

svn commit: r693524 - in /hadoop/core/trunk: ./ src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/ src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/

Author: dhruba
Date: Tue Sep  9 10:36:23 2008
New Revision: 693524

URL: http://svn.apache.org/viewvc?rev=693524&view=rev
Log:
HADOOP-4097. Make hive work well with speculative execution turned on.
(Joydeep Sen Sarma via dhruba)


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
    hadoop/core/trunk/src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693524&r1=693523&r2=693524&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep  9 10:36:23 2008
@@ -501,6 +501,9 @@
     implementations and moves it to the JobTracker. 
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-4097. Make hive work well with speculative execution turned on.
+    (Joydeep Sen Sarma via dhruba)
+
 Release 0.18.1 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=693524&r1=693523&r2=693524&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Sep  9 10:36:23 2008
@@ -81,7 +81,7 @@
     try {
       fs = FileSystem.get(hconf);
       finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf));
-      outPath = new Path(conf.getDirName(), "tmp."+Utilities.getTaskId(hconf));
+      outPath = new Path(conf.getDirName(), "_tmp."+Utilities.getTaskId(hconf));
       OutputFormat outputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
 
       if(outputFormat instanceof IgnoreKeyTextOutputFormat) {

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=693524&r1=693523&r2=693524&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Tue Sep  9 10:36:23 2008
@@ -19,13 +19,17 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.io.IOException;
+
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.plan.loadFileDesc;
 import org.apache.hadoop.hive.ql.plan.loadTableDesc;
 import org.apache.hadoop.hive.ql.plan.moveWork;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -35,6 +39,20 @@
 
   private static final long serialVersionUID = 1L;
 
+  private void cleanseSource(FileSystem fs, Path sourcePath) throws IOException {
+    if(sourcePath == null)
+      return;
+
+    FileStatus [] srcs = fs.globStatus(sourcePath);
+    if(srcs != null) {
+      for(FileStatus one: srcs) {
+        if(Hive.needsDeletion(one)) {
+          fs.delete(one.getPath(), true);
+        }
+      }
+    }
+  }
+
   public int execute() {
 
     try {
@@ -46,6 +64,7 @@
       for(loadFileDesc lfd: work.getLoadFileWork()) {
         Path targetPath = new Path(lfd.getTargetDir());
         Path sourcePath = new Path(lfd.getSourceDir());
+        cleanseSource(fs, sourcePath);
         if (lfd.getIsDfsDir()) {
           // Just do a rename on the URIs
           String mesg = "Moving data to: " + lfd.getTargetDir();

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=693524&r1=693523&r2=693524&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Sep  9 10:36:23 2008
@@ -502,17 +502,28 @@
     }
   }
 
+  public static boolean needsDeletion(FileStatus file) {
+    String name = file.getPath().getName();
+    // There is a race condition in hadoop as a result of which
+    // the _task files created in the output directory at the time
+    // of the mapper is reported in the output directory even though
+    // it is actually removed. The first check works around that
+    // NOTE: it's not clear that this still affects recent versions of hadoop
+
+    // the second check deals with uncommitted output files produced by hive tasks
+    // this would typically happen on task failures or due to speculation
+    return (name.startsWith("_task") || name.startsWith("_tmp."));
+  }
+
   private void checkPaths(FileSystem fs, FileStatus [] srcs, Path destf, boolean replace) throws HiveException {
     try {
         for(int i=0; i<srcs.length; i++) {
             FileStatus [] items = fs.listStatus(srcs[i].getPath());
             for(int j=0; j<items.length; j++) {
-                // There is a race condition in hadoop as a result of which
-                // the _task files created in the output directory at the time
-                // of the mapper is reported in the output directory even though
-                // it is actually removed. The following check works around that
-                if (items[j].getPath().getName().startsWith("_task")) {
-                    continue;
+
+                if (needsDeletion(items[j])) {
+                      fs.delete(items[j].getPath(), true);
+                      continue;
                 }
                 if(items[j].isDir()) {
                     throw new HiveException("checkPaths: "+srcs[i].toString()+" has nested directory"+
@@ -584,9 +595,6 @@
           for(int i=0; i<srcs.length; i++) {
               FileStatus[] items = fs.listStatus(srcs[i].getPath());
               for(int j=0; j<items.length; j++) {
-                  if (items[j].getPath().getName().startsWith("_task")) {
-                      continue;
-                  }
                   boolean b = fs.rename(items[j].getPath(), new Path(tmppath, items[j].getPath().getName()));
                   LOG.debug("Renaming:"+items[j]+",Status:"+b);
               }

Modified: hadoop/core/trunk/src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java?rev=693524&r1=693523&r2=693524&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java (original)
+++ hadoop/core/trunk/src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java Tue Sep  9 10:36:23 2008
@@ -65,7 +65,7 @@
 
     ColumnSet temp = (ColumnSet)obj;
     if(temp.col.size() <= _position) {
-      System.err.println("get " + temp.col.size() + "<=" + _position);
+      //System.err.println("get " + temp.col.size() + "<=" + _position);
       return null;
     }
     try {