You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/07/20 01:12:52 UTC
[hive] branch master updated: HIVE-21711: Regression caused by
HIVE-21279 for blobstorage fs (Vineet Garg,
reviewed by Prasanth Jayachandran)
This is an automated email from the ASF dual-hosted git repository.
jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ac78f79 HIVE-21711: Regression caused by HIVE-21279 for blobstorage fs (Vineet Garg, reviewed by Prasanth Jayachandran)
ac78f79 is described below
commit ac78f79bacd7b7e1b6c3ac294b2d776f8c2e5cd7
Author: Vineet Garg <vg...@apache.org>
AuthorDate: Fri Jul 19 18:12:30 2019 -0700
HIVE-21711: Regression caused by HIVE-21279 for blobstorage fs (Vineet Garg, reviewed by Prasanth Jayachandran)
---
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 44 +++++++++++++++++++++-
1 file changed, 42 insertions(+), 2 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 4372663..1d32ba0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -92,6 +92,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.HiveInterruptCallback;
import org.apache.hadoop.hive.common.HiveInterruptUtils;
@@ -1101,6 +1102,43 @@ public final class Utilities {
}
}
+ /**
+ * Moves files from src to dst if it is within the specified set of paths
+ * @param fs
+ * @param src
+ * @param dst
+ * @param filesToMove
+ * @throws IOException
+ * @throws HiveException
+ */
+ private static void moveSpecifiedFiles(FileSystem fs, Path src, Path dst, Set<Path> filesToMove)
+ throws IOException, HiveException {
+ if (!fs.exists(dst)) {
+ fs.mkdirs(dst);
+ }
+
+ FileStatus[] files = fs.listStatus(src);
+ for (FileStatus file : files) {
+ if (filesToMove.contains(file.getPath())) {
+ Utilities.moveFile(fs, file, dst);
+ } else if (file.isDir()) {
+ // Traverse directory contents.
+ // Directory nesting for dst needs to match src.
+ Path nestedDstPath = new Path(dst, file.getPath().getName());
+ Utilities.moveSpecifiedFiles(fs, file.getPath(), nestedDstPath, filesToMove);
+ }
+ }
+ }
+
+ private static void moveSpecifiedFileStatus(FileSystem fs, Path src, Path dst,
+ Set<FileStatus> filesToMove) throws IOException, HiveException {
+ Set<Path> filePaths = new HashSet<>();
+ for (FileStatus fstatus : filesToMove) {
+ filePaths.add(fstatus.getPath());
+ }
+ moveSpecifiedFiles(fs, src, dst, filePaths);
+ }
+
private static void moveFile(FileSystem fs, FileStatus file, Path dst) throws IOException,
HiveException {
Path srcFilePath = file.getPath();
@@ -1356,7 +1394,6 @@ public final class Utilities {
private static boolean shouldAvoidRename(FileSinkDesc conf, Configuration hConf) {
// we are avoiding rename/move only if following conditions are met
// * execution engine is tez
- // * query cache is disabled
// * if it is select query
if (conf != null && conf.getIsQuery() && conf.getFilesToFetch() != null
&& HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez")){
@@ -1394,10 +1431,11 @@ public final class Utilities {
// 3) Rename/move the temp directory to specPath
FileSystem fs = specPath.getFileSystem(hconf);
+ boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs);
Path tmpPath = Utilities.toTempPath(specPath);
Path taskTmpPath = Utilities.toTaskTempPath(specPath);
if (success) {
- if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath)) {
+ if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath) && !isBlobStorage) {
// 1) Rename tmpPath to a new directory name to prevent additional files
// from being added by runaway processes.
Path tmpPathOriginal = tmpPath;
@@ -1435,6 +1473,8 @@ public final class Utilities {
if(shouldAvoidRename(conf, hconf)){
LOG.debug("Skipping rename/move files. Files to be kept are: " + filesKept.toString());
conf.getFilesToFetch().addAll(filesKept);
+ } else if (isBlobStorage) {
+ Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept);
} else {
perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles");
Utilities.renameOrMoveFiles(fs, tmpPath, specPath);