You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/05/12 02:30:43 UTC
hive git commit: HIVE-13716 : Improve dynamic partition loading V
(Ashutosh Chauhan via Rui Li)
Repository: hive
Updated Branches:
refs/heads/master 66a021164 -> 107204a78
HIVE-13716 : Improve dynamic partition loading V (Ashutosh Chauhan via Rui Li)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/107204a7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/107204a7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/107204a7
Branch: refs/heads/master
Commit: 107204a78de0edceaeb4070c2df22214fb56b858
Parents: 66a0211
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Sun May 8 17:12:53 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed May 11 19:25:49 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/common/FileUtils.java | 6 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +-
.../apache/hadoop/hive/ql/exec/MoveTask.java | 2 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 110 ++++++++++---------
.../org/apache/hadoop/hive/io/HdfsUtils.java | 71 +++++++-----
5 files changed, 109 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 71c9188..b65c35b 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -529,7 +529,7 @@ public final class FileUtils {
} else {
try {
//set on the entire subtree
- HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs, firstNonExistentParent);
+ HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs, firstNonExistentParent, true);
} catch (Exception e) {
LOG.warn("Error setting permissions of " + firstNonExistentParent, e);
}
@@ -566,7 +566,7 @@ public final class FileUtils {
boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
if (copied && inheritPerms) {
try {
- HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, dstFS, dst.getParent()), dstFS, dst);
+ HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, dstFS, dst.getParent()), dstFS, dst, true);
} catch (Exception e) {
LOG.warn("Error setting permissions or group of " + dst, e);
}
@@ -685,7 +685,7 @@ public final class FileUtils {
//rename the directory
if (fs.rename(sourcePath, destPath)) {
try {
- HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, destPath.getParent()), fs, destPath);
+ HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, destPath.getParent()), fs, destPath, true);
} catch (Exception e) {
LOG.warn("Error setting permissions or group of " + destPath, e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 9392b6d..707de1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4216,7 +4216,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
fs.delete(location, true);
fs.mkdirs(location);
try {
- HdfsUtils.setFullFileStatus(conf, status, fs, location);
+ HdfsUtils.setFullFileStatus(conf, status, fs, location, false);
} catch (Exception e) {
LOG.warn("Error setting permissions of " + location, e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index bdda89a..21aa315 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -180,7 +180,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
fs.mkdirs(mkDirPath);
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS)) {
try {
- HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, actualPath), fs, mkDirPath);
+ HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, actualPath), fs, mkDirPath, true);
} catch (Exception e) {
LOG.warn("Error setting permissions or group of " + actualPath, e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 6af48ec..b5e660b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2694,7 +2694,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
if (inheritPerms) {
- HdfsUtils.setFullFileStatus(conf, fullDestStatus, destFs, destPath);
+ HdfsUtils.setFullFileStatus(conf, fullDestStatus, destFs, destPath, false);
}
if (null != newFiles) {
newFiles.add(destPath);
@@ -2784,9 +2784,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
//method is called. when the replace value is true, this method works a little different
//from mv command if the destf is a directory, it replaces the destf instead of moving under
//the destf. in this case, the replaced destf still preserves the original destf's permission
- public static boolean moveFile(HiveConf conf, Path srcf, final Path destf,
+ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf,
boolean replace, boolean isSrcLocal) throws HiveException {
- boolean success = false;
final FileSystem srcFs, destFs;
try {
destFs = destf.getFileSystem(conf);
@@ -2802,7 +2801,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
//needed for perm inheritance.
- boolean inheritPerms = HiveConf.getBoolVar(conf,
+ final boolean inheritPerms = HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
HdfsUtils.HadoopFileStatus destStatus = null;
@@ -2823,8 +2822,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
//if destf is an existing file, rename is actually a replace, and do not need
// to delete the file first
if (replace && !destIsSubDir) {
- LOG.debug("The path " + destf.toString() + " is deleted");
destFs.delete(destf, true);
+ LOG.debug("The path " + destf.toString() + " is deleted");
}
} catch (FileNotFoundException ignore) {
//if dest dir does not exist, any re
@@ -2833,75 +2832,84 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
}
+ final HdfsUtils.HadoopFileStatus desiredStatus = destStatus;
+ final SessionState parentSession = SessionState.get();
if (isSrcLocal) {
// For local src file, copy to hdfs
destFs.copyFromLocalFile(srcf, destf);
- success = true;
+ if (inheritPerms) {
+ try {
+ HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true);
+ } catch (IOException e) {
+ LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e);
+ }
+ }
+ return true;
} else {
if (needToCopy(srcf, destf, srcFs, destFs)) {
//copy if across file system or encryption zones.
- LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different.");
- success = FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf,
+ LOG.debug("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different.");
+ return FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf,
true, // delete source
replace, // overwrite destination
conf);
} else {
if (destIsSubDir) {
FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);
- if (srcs.length == 0) {
- success = true; // Nothing to move.
- } else {
- List<Future<Boolean>> futures = new LinkedList<>();
- final ExecutorService pool = Executors.newFixedThreadPool(
- conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build());
- /* Move files one by one because source is a subdirectory of destination */
- for (final FileStatus status : srcs) {
- futures.add(pool.submit(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return destFs.rename(status.getPath(), destf);
- }
- }));
- }
- pool.shutdown();
- boolean allFutures = true;
- for (Future<Boolean> future : futures) {
- try {
- Boolean result = future.get();
- allFutures &= result;
- if (!result) {
- LOG.debug("Failed to rename.");
- pool.shutdownNow();
+
+ List<Future<Void>> futures = new LinkedList<>();
+ final ExecutorService pool = Executors.newFixedThreadPool(
+ conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build());
+ /* Move files one by one because source is a subdirectory of destination */
+ for (final FileStatus status : srcs) {
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ SessionState.setCurrentSessionState(parentSession);
+ Path destPath = new Path(destf, status.getPath().getName());
+ try {
+ if(destFs.rename(status.getPath(), destf)) {
+ if (inheritPerms) {
+ HdfsUtils.setFullFileStatus(conf, desiredStatus, destFs, destPath, false);
+ }
+ } else {
+ throw new IOException("rename for src path: " + status.getPath() + " to dest path:"
+ + destPath + " returned false");
+ }
+ } catch (IOException ioe) {
+ LOG.error("Failed to rename/set permissions. Src path: {} Dest path: {}", status.getPath(), destPath);
+ throw ioe;
}
- } catch (Exception e) {
- LOG.debug("Failed to rename.", e.getMessage());
- pool.shutdownNow();
- throw new HiveException(e.getCause());
+ return null;
}
+ }));
+ }
+ pool.shutdown();
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ LOG.debug(e.getMessage());
+ pool.shutdownNow();
+ throw new HiveException(e.getCause());
}
- success = allFutures;
}
+ return true;
} else {
- success = destFs.rename(srcf, destf);
+ if (destFs.rename(srcf, destf)) {
+ if (inheritPerms) {
+ HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true);
+ }
+ return true;
+ }
+ return false;
}
}
}
-
- LOG.info((replace ? "Replacing src:" : "Renaming src: ") + srcf.toString()
- + ", dest: " + destf.toString() + ", Status:" + success);
} catch (IOException ioe) {
throw new HiveException("Unable to move source " + srcf + " to destination " + destf, ioe);
}
-
- if (success && inheritPerms) {
- try {
- HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf);
- } catch (IOException e) {
- LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e);
- }
- }
- return success;
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java b/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java
index e931156..c2060fc 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java
@@ -57,10 +57,28 @@ public class HdfsUtils {
}
public static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus,
- FileSystem fs, Path target) throws IOException {
- FileStatus fStatus= sourceStatus.getFileStatus();
- String group = fStatus.getGroup();
- LOG.trace(sourceStatus.getFileStatus().toString());
+ FileSystem fs, Path target, boolean recursion) throws IOException {
+ FileStatus fStatus= sourceStatus.getFileStatus();
+ String group = fStatus.getGroup();
+ boolean aclEnabled = Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true");
+ FsPermission sourcePerm = fStatus.getPermission();
+ List<AclEntry> aclEntries = null;
+ AclStatus aclStatus;
+ if (aclEnabled) {
+ aclStatus = sourceStatus.getAclStatus();
+ if (aclStatus != null) {
+ LOG.trace(aclStatus.toString());
+ aclEntries = aclStatus.getEntries();
+ removeBaseAclEntries(aclEntries);
+
+ //the ACL api's also expect the tradition user/group/other permission in the form of ACL
+ aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, sourcePerm.getUserAction()));
+ aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, sourcePerm.getGroupAction()));
+ aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, sourcePerm.getOtherAction()));
+ }
+ }
+
+ if (recursion) {
//use FsShell to change group, permissions, and extended ACL's recursively
FsShell fsShell = new FsShell();
fsShell.setConf(conf);
@@ -70,39 +88,40 @@ public class HdfsUtils {
if (group != null && !group.isEmpty()) {
run(fsShell, new String[]{"-chgrp", "-R", group, target.toString()});
}
-
- if (Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true")) {
- //Attempt extended Acl operations only if its enabled, 8791but don't fail the operation regardless.
- try {
- AclStatus aclStatus = sourceStatus.getAclStatus();
- if (aclStatus != null) {
- LOG.trace(aclStatus.toString());
- List<AclEntry> aclEntries = aclStatus.getEntries();
- removeBaseAclEntries(aclEntries);
-
- //the ACL api's also expect the tradition user/group/other permission in the form of ACL
- FsPermission sourcePerm = fStatus.getPermission();
- aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, sourcePerm.getUserAction()));
- aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, sourcePerm.getGroupAction()));
- aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, sourcePerm.getOtherAction()));
-
+ if (aclEnabled) {
+ if (null != aclEntries) {
+ //Attempt extended Acl operations only if its enabled, 8791but don't fail the operation regardless.
+ try {
//construct the -setfacl command
- String aclEntry = Joiner.on(",").join(aclStatus.getEntries());
+ String aclEntry = Joiner.on(",").join(aclEntries);
run(fsShell, new String[]{"-setfacl", "-R", "--set", aclEntry, target.toString()});
+
+ } catch (Exception e) {
+ LOG.info("Skipping ACL inheritance: File system for path " + target + " " +
+ "does not support ACLs but dfs.namenode.acls.enabled is set to true. ");
+ LOG.debug("The details are: " + e, e);
}
- } catch (Exception e) {
- LOG.info("Skipping ACL inheritance: File system for path " + target + " " +
- "does not support ACLs but dfs.namenode.acls.enabled is set to true. ");
- LOG.debug("The details are: " + e, e);
}
} else {
- String permission = Integer.toString(fStatus.getPermission().toShort(), 8);
+ String permission = Integer.toString(sourcePerm.toShort(), 8);
run(fsShell, new String[]{"-chmod", "-R", permission, target.toString()});
}
} catch (Exception e) {
throw new IOException("Unable to set permissions of " + target, e);
}
+ } else {
+ if (group != null && !group.isEmpty()) {
+ fs.setOwner(target, null, group);
+ }
+ if (aclEnabled) {
+ if (null != aclEntries) {
+ fs.setAcl(target, aclEntries);
+ }
+ } else {
+ fs.setPermission(target, sourcePerm);
+ }
}
+ }
/**
* Create a new AclEntry with scope, type and permission (no name).