You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/03/29 20:27:28 UTC
hive git commit: HIVE-12988 : Improve dynamic partition loading IV
(Ashutosh Chauhan via Prasanth J)
Repository: hive
Updated Branches:
refs/heads/master 1de97bc5f -> a14ef8abe
HIVE-12988 : Improve dynamic partition loading IV (Ashutosh Chauhan via Prasanth J)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a14ef8ab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a14ef8ab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a14ef8ab
Branch: refs/heads/master
Commit: a14ef8abe1df1516b8b9f486030bc3d584f940a9
Parents: 1de97bc
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Tue Feb 2 18:03:44 2016 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Mar 29 11:27:12 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 252 +++++++++++--------
.../org/apache/hadoop/fs/ProxyFileSystem.java | 5 +-
3 files changed, 155 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a14ef8ab/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b8870f2..f03c1ab 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2326,6 +2326,8 @@ public class HiveConf extends Configuration {
HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
"Comma separated list of non-SQL Hive commands users are authorized to execute"),
+ HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new SizeValidator(1L, true, 1024L, true), "Number of threads"
+ + " used to move files in move task"),
// If this is set all move tasks at the end of a multi-insert query will only begin once all
// outputs are ready
HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES(
@@ -2771,7 +2773,7 @@ public class HiveConf extends Configuration {
SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5",
"Name of the SASL mechanism to use for authentication."),
SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "",
- "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " +
+ "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " +
"Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." +
"This is only necessary if the host has mutiple network addresses and if a different network address other than " +
"hive.server2.thrift.bind.host is to be used."),
http://git-wip-us.apache.org/repos/asf/hive/blob/a14ef8ab/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 6d27f55..c27481f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -32,19 +32,25 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableMap;
+
import javax.jdo.JDODataStoreException;
import org.apache.hadoop.conf.Configuration;
@@ -132,6 +138,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class has functions that implement meta data/DDL operations using calls
@@ -1504,7 +1511,7 @@ public class Hive {
isSrcLocal);
} else {
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) {
- newFiles = new ArrayList<>();
+ newFiles = Collections.synchronizedList(new ArrayList<Path>());
}
FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
@@ -1751,9 +1758,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
public void loadTable(Path loadPath, String tableName, boolean replace,
boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid)
throws HiveException {
- List<Path> newFiles = new ArrayList<Path>();
+
+ List<Path> newFiles = null;
Table tbl = getTable(tableName);
HiveConf sessionConf = SessionState.getSessionConf();
+ if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
+ newFiles = Collections.synchronizedList(new ArrayList<Path>());
+ }
if (replace) {
Path tableDest = tbl.getPath();
replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal);
@@ -2579,75 +2590,91 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
- // for each file or directory in 'srcs', make mapping for every file in src to safe name in dest
- private static List<List<Path[]>> checkPaths(HiveConf conf, FileSystem fs,
- FileStatus[] srcs, FileSystem srcFs, Path destf, boolean replace)
- throws HiveException {
+ private static void copyFiles(final HiveConf conf, final FileSystem destFs,
+ FileStatus[] srcs, final FileSystem srcFs, final Path destf, final boolean isSrcLocal, final List<Path> newFiles)
+ throws HiveException {
- List<List<Path[]>> result = new ArrayList<List<Path[]>>();
+ final HadoopShims.HdfsFileStatus fullDestStatus;
try {
- FileStatus destStatus = !replace ? FileUtils.getFileStatusOrNull(fs, destf) : null;
- if (destStatus != null && !destStatus.isDir()) {
- throw new HiveException("checkPaths: destination " + destf
- + " should be a directory");
- }
- for (FileStatus src : srcs) {
- FileStatus[] items;
- if (src.isDir()) {
- items = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
- Arrays.sort(items);
+ fullDestStatus = ShimLoader.getHadoopShims().getFullFileStatus(conf, destFs, destf);
+ } catch (IOException e1) {
+ throw new HiveException(e1);
+ }
+
+ if (!fullDestStatus.getFileStatus().isDirectory()) {
+ throw new HiveException(destf + " is not a directory.");
+ }
+ final boolean inheritPerms = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+ final List<Future<ObjectPair<Path, Path>>> futures = new LinkedList<>();
+ final ExecutorService pool = Executors.newFixedThreadPool(
+ conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build());
+
+ for (FileStatus src : srcs) {
+ FileStatus[] files;
+ if (src.isDirectory()) {
+ try {
+ files = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
+ } catch (IOException e) {
+ pool.shutdownNow();
+ throw new HiveException(e);
+ }
+ } else {
+ files = new FileStatus[] {src};
+ }
+
+ for (FileStatus srcFile : files) {
+
+ final Path srcP = srcFile.getPath();
+ final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs);
+ // Strip off the file type, if any so we don't make:
+ // 000000_0.gz -> 000000_0.gz_copy_1
+ final String name;
+ final String filetype;
+ String itemName = srcP.getName();
+ int index = itemName.lastIndexOf('.');
+ if (index >= 0) {
+ filetype = itemName.substring(index);
+ name = itemName.substring(0, index);
} else {
- items = new FileStatus[] {src};
+ name = itemName;
+ filetype = "";
}
-
- List<Path[]> srcToDest = new ArrayList<Path[]>();
- for (FileStatus item : items) {
-
- Path itemSource = item.getPath();
-
- if (Utilities.isTempPath(item)) {
- // This check is redundant because temp files are removed by
- // execution layer before
- // calling loadTable/Partition. But leaving it in just in case.
- srcFs.delete(itemSource, true);
- continue;
- }
-
- Path itemDest = new Path(destf, itemSource.getName());
-
- if (!replace) {
- // Strip off the file type, if any so we don't make:
- // 000000_0.gz -> 000000_0.gz_copy_1
- String name = itemSource.getName();
- String filetype;
- int index = name.lastIndexOf('.');
- if (index >= 0) {
- filetype = name.substring(index);
- name = name.substring(0, index);
+ futures.add(pool.submit(new Callable<ObjectPair<Path, Path>>() {
+ @Override
+ public ObjectPair<Path, Path> call() throws Exception {
+ Path destPath = new Path(destf, srcP.getName());
+ if (!needToCopy && !isSrcLocal) {
+ for (int counter = 1; !destFs.rename(srcP,destPath); counter++) {
+ destPath = new Path(destf, name + ("_copy_" + counter) + filetype);
+ }
} else {
- filetype = "";
+ destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name, filetype);
}
- // It's possible that the file we're copying may have the same
- // relative name as an existing file in the "destf" directory.
- // So let's make a quick check to see if we can rename any
- // potential offenders so as to allow them to move into the
- // "destf" directory. The scheme is dead simple: simply tack
- // on "_copy_N" where N starts at 1 and works its way up until
- // we find a free space.
-
- // removed source file staging.. it's more confusing when failed.
- for (int counter = 1; fs.exists(itemDest) || destExists(result, itemDest); counter++) {
- itemDest = new Path(destf, name + ("_copy_" + counter) + filetype);
+
+ if (inheritPerms) {
+ ShimLoader.getHadoopShims().setFullFileStatus(conf, fullDestStatus, destFs, destf);
}
+ if (null != newFiles) {
+ newFiles.add(destPath);
+ }
+ return ObjectPair.create(srcP, destPath);
}
- srcToDest.add(new Path[]{itemSource, itemDest});
- }
- result.add(srcToDest);
+ }));
+ }
+ }
+ pool.shutdown();
+ for (Future<ObjectPair<Path, Path>> future : futures) {
+ try {
+ ObjectPair<Path, Path> pair = future.get();
+ LOG.debug("Moved src: {}", pair.getFirst().toString(), ", to dest: {}", pair.getSecond().toString());
+ } catch (Exception e) {
+ LOG.error("Failed to move: {}", e.getMessage());
+ pool.shutdownNow();
+ throw new HiveException(e.getCause());
}
- } catch (IOException e) {
- throw new HiveException("checkPaths: filesystem error in check phase. " + e.getMessage(), e);
}
- return result;
}
private static boolean destExists(List<List<Path[]>> result, Path proposed) {
@@ -2704,14 +2731,34 @@ private void constructOneLBLocationMap(FileStatus fSta,
return ShimLoader.getHadoopShims().getPathWithoutSchemeAndAuthority(path).toString();
}
+ private static Path mvFile(HiveConf conf, Path srcf, Path destf, boolean isSrcLocal,
+ FileSystem srcFs, FileSystem destFs, String srcName, String filetype) throws IOException {
+
+ for (int counter = 1; destFs.exists(destf); counter++) {
+ destf = new Path(destf.getParent(), srcName + ("_copy_" + counter) + filetype);
+ }
+ if (isSrcLocal) {
+ // For local src file, copy to hdfs
+ destFs.copyFromLocalFile(srcf, destf);
+ } else {
+ //copy if across file system or encryption zones.
+ LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different.");
+ FileUtils.copy(srcFs, srcf, destFs, destf,
+ true, // delete source
+ false, // overwrite destination
+ conf);
+ }
+ return destf;
+ }
+
//it is assumed that parent directory of the destf should already exist when this
//method is called. when the replace value is true, this method works a little different
//from mv command if the destf is a directory, it replaces the destf instead of moving under
//the destf. in this case, the replaced destf still preserves the original destf's permission
- public static boolean moveFile(HiveConf conf, Path srcf, Path destf,
+ public static boolean moveFile(HiveConf conf, Path srcf, final Path destf,
boolean replace, boolean isSrcLocal) throws HiveException {
boolean success = false;
- FileSystem srcFs, destFs;
+ final FileSystem srcFs, destFs;
try {
destFs = destf.getFileSystem(conf);
} catch (IOException e) {
@@ -2775,31 +2822,38 @@ private void constructOneLBLocationMap(FileStatus fSta,
FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);
if (srcs.length == 0) {
success = true; // Nothing to move.
- }
- /* Move files one by one because source is a subdirectory of destination */
- for (FileStatus status : srcs) {
- Path destFile;
-
- /* Append the source filename to the destination directory */
- if (destFs.isDirectory(destf)) {
- destFile = new Path(destf, status.getPath().getName());
- } else {
- destFile = destf;
+ } else {
+ List<Future<Boolean>> futures = new LinkedList<>();
+ final ExecutorService pool = Executors.newFixedThreadPool(
+ conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build());
+ /* Move files one by one because source is a subdirectory of destination */
+ for (final FileStatus status : srcs) {
+ futures.add(pool.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return destFs.rename(status.getPath(), destf);
+ }
+ }));
}
-
- // Destination should be replaced, so we delete it first
- if (destFs.exists(destFile)) {
- if (!destFs.delete(destFile, true)) {
- throw new HiveException(String.format("File to replace could not be deleted: %s", destFile));
+ pool.shutdown();
+ boolean allFutures = true;
+ for (Future<Boolean> future : futures) {
+ try {
+ Boolean result = future.get();
+ allFutures &= result;
+ if (!result) {
+ LOG.debug("Failed to rename.");
+ pool.shutdownNow();
+ }
+ } catch (Exception e) {
+ LOG.debug("Failed to rename.", e.getMessage());
+ pool.shutdownNow();
+ throw new HiveException(e.getCause());
}
}
-
- if (!(destFs.rename(status.getPath(), destFile))) {
- throw new HiveException("Unable to move source " + status.getPath() + " to destination " + destf);
- }
+ success = allFutures;
}
-
- success = true;
} else {
success = destFs.rename(srcf, destf);
}
@@ -2825,8 +2879,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
/**
* If moving across different FileSystems or differnent encryption zone, need to do a File copy instead of rename.
* TODO- consider if need to do this for different file authority.
+ * @throws HiveException
*/
- static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException, IOException {
+ static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException {
//Check if different FileSystems
if (!srcFs.getClass().equals(destFs.getClass())) {
return true;
@@ -2834,8 +2889,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
//Check if different encryption zones
HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim();
- return hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf))
- && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf);
+ try {
+ return hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf))
+ && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
}
/**
@@ -2886,22 +2945,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (isAcid) {
moveAcidFiles(srcFs, srcs, destf, newFiles);
} else {
- // check that source and target paths exist
- List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false);
- // move it, move it
- try {
- for (List<Path[]> sdpairs : result) {
- for (Path[] sdpair : sdpairs) {
- if (!moveFile(conf, sdpair[0], sdpair[1], false, isSrcLocal)) {
- throw new IOException("Cannot move " + sdpair[0] + " to "
- + sdpair[1]);
- }
- if (newFiles != null) newFiles.add(sdpair[1]);
- }
- }
- } catch (IOException e) {
- throw new HiveException("copyFiles: error while moving files!!! " + e.getMessage(), e);
- }
+ copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a14ef8ab/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java
index cb1e2b7..2c37a51 100644
--- a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java
+++ b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java
@@ -82,6 +82,7 @@ public class ProxyFileSystem extends FilterFileSystem {
* @return
* @throws IOException
*/
+ @Override
public Path resolvePath(final Path p) throws IOException {
// Return the fully-qualified path of path f resolving the path
// through any symlinks or mount point
@@ -174,7 +175,9 @@ public class ProxyFileSystem extends FilterFileSystem {
@Override
public boolean rename(Path src, Path dst) throws IOException {
- return super.rename(swizzleParamPath(src), swizzleParamPath(dst));
+ Path dest = swizzleParamPath(dst);
+ // Make sure for existing destination we return false as per FileSystem api contract
+ return super.isFile(dest) ? false : super.rename(swizzleParamPath(src), dest);
}
@Override