You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/10/16 07:02:08 UTC

svn commit: r1023178 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Author: namit
Date: Sat Oct 16 05:02:08 2010
New Revision: 1023178

URL: http://svn.apache.org/viewvc?rev=1023178&view=rev
Log:
HIVE-1707. Bug when different partitions are present in different dfs
(Yongqiang He via namit)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=1023178&r1=1023177&r2=1023178&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Sat Oct 16 05:02:08 2010
@@ -376,6 +376,9 @@ Trunk -  Unreleased
     HIVE-1717. ant clean should delete stats database
     (Ning Zhang via namit)
 
+    HIVE-1707. Bug when different partitions are present in different dfs
+    (Yongqiang He via namit)
+
   TESTS
 
     HIVE-1464. improve  test query performance

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1023178&r1=1023177&r2=1023178&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Sat Oct 16 05:02:08 2010
@@ -39,8 +39,10 @@ import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaException;
@@ -905,35 +907,28 @@ public class Hive {
        * processes might move forward with partial data
        */
 
-      FileSystem fs;
-      Path partPath;
+      Partition oldPart = getPartition(tbl, partSpec, false, null);
+      Path oldPartPath = null;
+      if(oldPart != null) {
+        oldPartPath = oldPart.getPartitionPath();
+      }
+      
+      Path partPath = new Path(tbl.getDataLocation().getPath(),
+          Warehouse.makePartName(partSpec));
 
-      // check if partition exists without creating it
-      Partition part = getPartition(tbl, partSpec, false);
-      if (part == null) {
-        // Partition does not exist currently. The partition name is
-        // extrapolated from
-        // the table's location (even if the table is marked external)
-        fs = FileSystem.get(tbl.getDataLocation(), getConf());
-        partPath = new Path(tbl.getDataLocation().getPath(),
-            Warehouse.makePartName(partSpec));
-      } else {
-        // Partition exists already. Get the path from the partition. This will
-        // get the default path for Hive created partitions or the external path
-        // when directly created by user
-        partPath = part.getPath()[0];
-        fs = partPath.getFileSystem(getConf());
-      }
+      Path newPartPath = new Path(loadPath.toUri().getScheme(), loadPath
+          .toUri().getAuthority(), partPath.toUri().getPath());
 
       if (replace) {
-        Hive.replaceFiles(loadPath, partPath, fs, tmpDirPath);
+        Hive.replaceFiles(loadPath, newPartPath, oldPartPath, tmpDirPath, getConf());
       } else {
-        Hive.copyFiles(loadPath, partPath, fs);
+        FileSystem fs = FileSystem.get(tbl.getDataLocation(), getConf());
+        Hive.copyFiles(loadPath, newPartPath, fs);
       }
 
       // recreate the partition if it existed before
       if (!holdDDLTime) {
-        part = getPartition(tbl, partSpec, true);
+        getPartition(tbl, partSpec, true, newPartPath.toString());
       }
     } catch (IOException e) {
       LOG.error(StringUtils.stringifyException(e));
@@ -1090,6 +1085,11 @@ public class Hive {
 
     return new Partition(tbl, partition);
   }
+  
+  public Partition getPartition(Table tbl, Map<String, String> partSpec,
+      boolean forceCreate) throws HiveException {
+    return getPartition(tbl, partSpec, forceCreate, null);
+  }
 
   /**
    * Returns partition metadata
@@ -1105,7 +1105,7 @@ public class Hive {
    * @throws HiveException
    */
   public Partition getPartition(Table tbl, Map<String, String> partSpec,
-      boolean forceCreate) throws HiveException {
+      boolean forceCreate, String partPath) throws HiveException {
     if (!tbl.isValidSpec(partSpec)) {
       throw new HiveException("Invalid partition: " + partSpec);
     }
@@ -1147,6 +1147,10 @@ public class Hive {
           tpart.getSd().setOutputFormat(tbl.getTTable().getSd().getOutputFormat());
           tpart.getSd().setInputFormat(tbl.getTTable().getSd().getInputFormat());
           tpart.getSd().getSerdeInfo().setSerializationLib(tbl.getSerializationLib());
+          if (partPath == null || partPath.trim().equals("")) {
+            throw new HiveException("new partition path should not be null or empty.");
+          }
+          tpart.getSd().setLocation(partPath);
           alterPartition(tbl.getTableName(), new Partition(tbl, tpart));
         }
       }
@@ -1443,19 +1447,31 @@ public class Hive {
 
   /**
    * Replaces files in the partition with new data set specifed by srcf. Works
-   * by moving files
+   * by moving files.
+   * srcf, destf, and tmppath should resident in the same dfs, but the oldPath can be in a 
+   * different dfs.
    *
    * @param srcf
    *          Files to be moved. Leaf Directories or Globbed File Paths
    * @param destf
    *          The directory where the final data needs to go
-   * @param fs
-   *          The filesystem handle
+   * @param oldPath
+   *          The directory where the old data location, need to be cleaned up.
    * @param tmppath
    *          Temporary directory
    */
-  static protected void replaceFiles(Path srcf, Path destf, FileSystem fs,
-      Path tmppath) throws HiveException {
+  static protected void replaceFiles(Path srcf, Path destf, Path oldPath,
+      Path tmppath, Configuration conf) throws HiveException {
+    
+    FileSystem fs = null;
+    FsShell fshell = new FsShell();
+    fshell.setConf(conf);
+    try {
+      fs = FileSystem.get(srcf.toUri(), conf);
+    } catch (IOException e1) {
+      throw new HiveException(e1.getMessage(), e1);
+    }
+
     FileStatus[] srcs;
     try {
       srcs = fs.listStatus(srcf);
@@ -1483,8 +1499,17 @@ public class Hive {
       }
 
       // point of no return
-      boolean b = fs.delete(destf, true);
-      LOG.debug("Deleting:" + destf.toString() + ",Status:" + b);
+      if (oldPath != null) {
+        try {
+          fshell.run(new String[]{"-rmr", oldPath.toUri().toString()});
+        } catch (Exception e) {
+          //swallow the exception
+        }
+      }
+      try {
+        fshell.run(new String[]{"-rmr", destf.toUri().toString()});
+      } catch (Exception e) {
+      }
 
       // create the parent directory otherwise rename can fail if the parent
       // doesn't exist
@@ -1493,13 +1518,12 @@ public class Hive {
             + destf.getParent().toString());
       }
 
-      b = fs.rename(tmppath, destf);
+      boolean b = fs.rename(tmppath, destf);
       if (!b) {
         throw new HiveException("Unable to move results from " + tmppath
             + " to destination directory: " + destf.getParent().toString());
       }
       LOG.debug("Renaming:" + tmppath.toString() + ",Status:" + b);
-
     } catch (IOException e) {
       throw new HiveException("replaceFiles: error while moving files from "
           + tmppath + " to " + destf + "!!!", e);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1023178&r1=1023177&r2=1023178&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Sat Oct 16 05:02:08 2010
@@ -524,13 +524,8 @@ public class Table implements Serializab
    *          Temporary directory
    */
   protected void replaceFiles(Path srcf, Path tmpd) throws HiveException {
-    FileSystem fs;
-    try {
-      fs = FileSystem.get(getDataLocation(), Hive.get().getConf());
-      Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), fs, tmpd);
-    } catch (IOException e) {
-      throw new HiveException("addFiles: filesystem error in check phase", e);
-    }
+    Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), null, tmpd,
+        Hive.get().getConf());
   }
 
   /**

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1023178&r1=1023177&r2=1023178&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Sat Oct 16 05:02:08 2010
@@ -3435,8 +3435,14 @@ public class SemanticAnalyzer extends Ba
 
       dest_part = qbm.getDestPartitionForAlias(dest);
       dest_tab = dest_part.getTable();
+      Path tabPath = dest_tab.getPath();
+      Path partPath = dest_part.getPartitionPath(); 
+      
+        // if the table is in a different dfs than the partition,
+        // replace the partition's dfs with the table's dfs.
+      dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
+          .getAuthority(), partPath.toUri().getPath());
 
-      dest_path = dest_part.getPath()[0];
       if ("har".equalsIgnoreCase(dest_path.toUri().getScheme())) {
         throw new SemanticException(ErrorMsg.OVERWRITE_ARCHIVED_PART
             .getMsg());