You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/05/12 02:30:43 UTC

hive git commit: HIVE-13716 : Improve dynamic partition loading V (Ashutosh Chauhan via Rui Li)

Repository: hive
Updated Branches:
  refs/heads/master 66a021164 -> 107204a78


HIVE-13716 : Improve dynamic partition loading V (Ashutosh Chauhan via Rui Li)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/107204a7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/107204a7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/107204a7

Branch: refs/heads/master
Commit: 107204a78de0edceaeb4070c2df22214fb56b858
Parents: 66a0211
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Sun May 8 17:12:53 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed May 11 19:25:49 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    |   6 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   2 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   2 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 110 ++++++++++---------
 .../org/apache/hadoop/hive/io/HdfsUtils.java    |  71 +++++++-----
 5 files changed, 109 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 71c9188..b65c35b 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -529,7 +529,7 @@ public final class FileUtils {
       } else {
         try {
           //set on the entire subtree
-          HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs, firstNonExistentParent);
+          HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs, firstNonExistentParent, true);
         } catch (Exception e) {
           LOG.warn("Error setting permissions of " + firstNonExistentParent, e);
         }
@@ -566,7 +566,7 @@ public final class FileUtils {
     boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     if (copied && inheritPerms) {
       try {
-        HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, dstFS, dst.getParent()), dstFS, dst);
+        HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, dstFS, dst.getParent()), dstFS, dst, true);
       } catch (Exception e) {
         LOG.warn("Error setting permissions or group of " + dst, e);
       }
@@ -685,7 +685,7 @@ public final class FileUtils {
       //rename the directory
       if (fs.rename(sourcePath, destPath)) {
         try {
-          HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, destPath.getParent()), fs, destPath);
+          HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, destPath.getParent()), fs, destPath, true);
         } catch (Exception e) {
           LOG.warn("Error setting permissions or group of " + destPath, e);
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 9392b6d..707de1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4216,7 +4216,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
         fs.delete(location, true);
         fs.mkdirs(location);
         try {
-          HdfsUtils.setFullFileStatus(conf, status, fs, location);
+          HdfsUtils.setFullFileStatus(conf, status, fs, location, false);
         } catch (Exception e) {
           LOG.warn("Error setting permissions of " + location, e);
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index bdda89a..21aa315 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -180,7 +180,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
       fs.mkdirs(mkDirPath);
       if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS)) {
         try {
-          HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, actualPath), fs, mkDirPath);
+          HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, actualPath), fs, mkDirPath, true);
         } catch (Exception e) {
           LOG.warn("Error setting permissions or group of " + actualPath, e);
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 6af48ec..b5e660b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2694,7 +2694,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
             }
 
             if (inheritPerms) {
-              HdfsUtils.setFullFileStatus(conf, fullDestStatus, destFs, destPath);
+              HdfsUtils.setFullFileStatus(conf, fullDestStatus, destFs, destPath, false);
             }
             if (null != newFiles) {
               newFiles.add(destPath);
@@ -2784,9 +2784,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
   //method is called. when the replace value is true, this method works a little different
   //from mv command if the destf is a directory, it replaces the destf instead of moving under
   //the destf. in this case, the replaced destf still preserves the original destf's permission
-  public static boolean moveFile(HiveConf conf, Path srcf, final Path destf,
+  public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf,
       boolean replace, boolean isSrcLocal) throws HiveException {
-    boolean success = false;
     final FileSystem srcFs, destFs;
     try {
       destFs = destf.getFileSystem(conf);
@@ -2802,7 +2801,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
 
     //needed for perm inheritance.
-    boolean inheritPerms = HiveConf.getBoolVar(conf,
+    final boolean inheritPerms = HiveConf.getBoolVar(conf,
         HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     HdfsUtils.HadoopFileStatus destStatus = null;
 
@@ -2823,8 +2822,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
           //if destf is an existing file, rename is actually a replace, and do not need
           // to delete the file first
           if (replace && !destIsSubDir) {
-            LOG.debug("The path " + destf.toString() + " is deleted");
             destFs.delete(destf, true);
+            LOG.debug("The path " + destf.toString() + " is deleted");
           }
         } catch (FileNotFoundException ignore) {
           //if dest dir does not exist, any re
@@ -2833,75 +2832,84 @@ private void constructOneLBLocationMap(FileStatus fSta,
           }
         }
       }
+      final HdfsUtils.HadoopFileStatus desiredStatus = destStatus;
+      final SessionState parentSession = SessionState.get();
       if (isSrcLocal) {
         // For local src file, copy to hdfs
         destFs.copyFromLocalFile(srcf, destf);
-        success = true;
+        if (inheritPerms) {
+          try {
+            HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true);
+          } catch (IOException e) {
+            LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e);
+          }
+        }
+        return true;
       } else {
         if (needToCopy(srcf, destf, srcFs, destFs)) {
           //copy if across file system or encryption zones.
-          LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different.");
-          success = FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf,
+          LOG.debug("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different.");
+          return FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf,
               true,    // delete source
               replace, // overwrite destination
               conf);
         } else {
           if (destIsSubDir) {
             FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);
-            if (srcs.length == 0) {
-              success = true; // Nothing to move.
-            } else {
-              List<Future<Boolean>> futures = new LinkedList<>();
-              final ExecutorService pool = Executors.newFixedThreadPool(
-                  conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
-                  new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build());
-              /* Move files one by one because source is a subdirectory of destination */
-              for (final FileStatus status : srcs) {
-                futures.add(pool.submit(new Callable<Boolean>() {
-                  @Override
-                  public Boolean call() throws Exception {
-                    return destFs.rename(status.getPath(), destf);
-                  }
-                }));
-              }
-              pool.shutdown();
-              boolean allFutures = true;
-              for (Future<Boolean> future : futures) {
-                try {
-                  Boolean result = future.get();
-                  allFutures &= result;
-                  if (!result) {
-                    LOG.debug("Failed to rename.");
-                    pool.shutdownNow();
+
+            List<Future<Void>> futures = new LinkedList<>();
+            final ExecutorService pool = Executors.newFixedThreadPool(
+                conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
+                new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build());
+            /* Move files one by one because source is a subdirectory of destination */
+            for (final FileStatus status : srcs) {
+              futures.add(pool.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                  SessionState.setCurrentSessionState(parentSession);
+                  Path destPath = new Path(destf, status.getPath().getName());
+                  try {
+                    if(destFs.rename(status.getPath(), destf)) {
+                      if (inheritPerms) {
+                        HdfsUtils.setFullFileStatus(conf, desiredStatus, destFs, destPath, false);
+                      }
+                    } else {
+                      throw new IOException("rename for src path: " + status.getPath() + " to dest path:"
+                          + destPath + " returned false");
+                    }
+                  } catch (IOException ioe) {
+                    LOG.error("Failed to rename/set permissions. Src path: {} Dest path: {}", status.getPath(), destPath);
+                    throw ioe;
                   }
-                } catch (Exception e) {
-                  LOG.debug("Failed to rename.", e.getMessage());
-                  pool.shutdownNow();
-                  throw new HiveException(e.getCause());
+                  return null;
                 }
+              }));
+            }
+            pool.shutdown();
+            for (Future<Void> future : futures) {
+              try {
+                future.get();
+              } catch (Exception e) {
+                LOG.debug(e.getMessage());
+                pool.shutdownNow();
+                throw new HiveException(e.getCause());
               }
-              success = allFutures;
             }
+            return true;
           } else {
-            success = destFs.rename(srcf, destf);
+            if (destFs.rename(srcf, destf)) {
+              if (inheritPerms) {
+                HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true);
+              }
+              return true;
+            }
+            return false;
           }
         }
       }
-
-      LOG.info((replace ? "Replacing src:" : "Renaming src: ") + srcf.toString()
-          + ", dest: " + destf.toString()  + ", Status:" + success);
     } catch (IOException ioe) {
       throw new HiveException("Unable to move source " + srcf + " to destination " + destf, ioe);
     }
-
-    if (success && inheritPerms) {
-      try {
-        HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf);
-      } catch (IOException e) {
-        LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e);
-      }
-    }
-    return success;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java b/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java
index e931156..c2060fc 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java
@@ -57,10 +57,28 @@ public class HdfsUtils {
   }
 
   public static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus,
-      FileSystem fs, Path target) throws IOException {
-      FileStatus fStatus= sourceStatus.getFileStatus();
-      String group = fStatus.getGroup();
-      LOG.trace(sourceStatus.getFileStatus().toString());
+      FileSystem fs, Path target, boolean recursion) throws IOException {
+    FileStatus fStatus= sourceStatus.getFileStatus();
+    String group = fStatus.getGroup();
+    boolean aclEnabled = Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true");
+    FsPermission sourcePerm = fStatus.getPermission();
+    List<AclEntry> aclEntries = null;
+    AclStatus aclStatus;
+    if (aclEnabled) {
+      aclStatus =  sourceStatus.getAclStatus();
+      if (aclStatus != null) {
+        LOG.trace(aclStatus.toString());
+        aclEntries = aclStatus.getEntries();
+        removeBaseAclEntries(aclEntries);
+
+        //the ACL api's also expect the tradition user/group/other permission in the form of ACL
+        aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, sourcePerm.getUserAction()));
+        aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, sourcePerm.getGroupAction()));
+        aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, sourcePerm.getOtherAction()));
+      }
+    }
+
+    if (recursion) {
       //use FsShell to change group, permissions, and extended ACL's recursively
       FsShell fsShell = new FsShell();
       fsShell.setConf(conf);
@@ -70,39 +88,40 @@ public class HdfsUtils {
         if (group != null && !group.isEmpty()) {
           run(fsShell, new String[]{"-chgrp", "-R", group, target.toString()});
         }
-
-        if (Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true")) {
-          //Attempt extended Acl operations only if its enabled, 8791but don't fail the operation regardless.
-          try {
-            AclStatus aclStatus =  sourceStatus.getAclStatus();
-            if (aclStatus != null) {
-              LOG.trace(aclStatus.toString());
-              List<AclEntry> aclEntries = aclStatus.getEntries();
-              removeBaseAclEntries(aclEntries);
-
-              //the ACL api's also expect the tradition user/group/other permission in the form of ACL
-              FsPermission sourcePerm = fStatus.getPermission();
-              aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, sourcePerm.getUserAction()));
-              aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, sourcePerm.getGroupAction()));
-              aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, sourcePerm.getOtherAction()));
-
+        if (aclEnabled) {
+          if (null != aclEntries) {
+            //Attempt extended Acl operations only if its enabled, 8791but don't fail the operation regardless.
+            try {
               //construct the -setfacl command
-              String aclEntry = Joiner.on(",").join(aclStatus.getEntries());
+              String aclEntry = Joiner.on(",").join(aclEntries);
               run(fsShell, new String[]{"-setfacl", "-R", "--set", aclEntry, target.toString()});
+
+            } catch (Exception e) {
+              LOG.info("Skipping ACL inheritance: File system for path " + target + " " +
+                  "does not support ACLs but dfs.namenode.acls.enabled is set to true. ");
+              LOG.debug("The details are: " + e, e);
             }
-          } catch (Exception e) {
-            LOG.info("Skipping ACL inheritance: File system for path " + target + " " +
-                    "does not support ACLs but dfs.namenode.acls.enabled is set to true. ");
-            LOG.debug("The details are: " + e, e);
           }
         } else {
-          String permission = Integer.toString(fStatus.getPermission().toShort(), 8);
+          String permission = Integer.toString(sourcePerm.toShort(), 8);
           run(fsShell, new String[]{"-chmod", "-R", permission, target.toString()});
         }
       } catch (Exception e) {
         throw new IOException("Unable to set permissions of " + target, e);
       }
+    } else {
+      if (group != null && !group.isEmpty()) {
+        fs.setOwner(target, null, group);
+      }
+      if (aclEnabled) {
+        if (null != aclEntries) {
+          fs.setAcl(target, aclEntries);
+        }
+      } else {
+        fs.setPermission(target, sourcePerm);
+      }
     }
+  }
 
   /**
    * Create a new AclEntry with scope, type and permission (no name).