You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/07/20 01:12:52 UTC

[hive] branch master updated: HIVE-21711: Regression caused by HIVE-21279 for blobstorage fs (Vineet Garg, reviewed by Prasanth Jayachandran)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ac78f79  HIVE-21711: Regression caused by HIVE-21279 for blobstorage fs (Vineet Garg, reviewed by Prasanth Jayachandran)
ac78f79 is described below

commit ac78f79bacd7b7e1b6c3ac294b2d776f8c2e5cd7
Author: Vineet Garg <vg...@apache.org>
AuthorDate: Fri Jul 19 18:12:30 2019 -0700

    HIVE-21711: Regression caused by HIVE-21279 for blobstorage fs (Vineet Garg, reviewed by Prasanth Jayachandran)
---
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  | 44 +++++++++++++++++++++-
 1 file changed, 42 insertions(+), 2 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 4372663..1d32ba0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -92,6 +92,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveInterruptCallback;
 import org.apache.hadoop.hive.common.HiveInterruptUtils;
@@ -1101,6 +1102,43 @@ public final class Utilities {
     }
   }
 
+  /**
+   * Moves files from src to dst if it is within the specified set of paths
+   * @param fs
+   * @param src
+   * @param dst
+   * @param filesToMove
+   * @throws IOException
+   * @throws HiveException
+   */
+  private static void moveSpecifiedFiles(FileSystem fs, Path src, Path dst, Set<Path> filesToMove)
+      throws IOException, HiveException {
+    if (!fs.exists(dst)) {
+      fs.mkdirs(dst);
+    }
+
+    FileStatus[] files = fs.listStatus(src);
+    for (FileStatus file : files) {
+      if (filesToMove.contains(file.getPath())) {
+        Utilities.moveFile(fs, file, dst);
+      } else if (file.isDir()) {
+        // Traverse directory contents.
+        // Directory nesting for dst needs to match src.
+        Path nestedDstPath = new Path(dst, file.getPath().getName());
+        Utilities.moveSpecifiedFiles(fs, file.getPath(), nestedDstPath, filesToMove);
+      }
+    }
+  }
+
+  private static void moveSpecifiedFileStatus(FileSystem fs, Path src, Path dst,
+      Set<FileStatus> filesToMove) throws IOException, HiveException {
+    Set<Path> filePaths = new HashSet<>();
+    for (FileStatus fstatus : filesToMove) {
+      filePaths.add(fstatus.getPath());
+    }
+    moveSpecifiedFiles(fs, src, dst, filePaths);
+  }
+
   private static void moveFile(FileSystem fs, FileStatus file, Path dst) throws IOException,
       HiveException {
     Path srcFilePath = file.getPath();
@@ -1356,7 +1394,6 @@ public final class Utilities {
   private static boolean shouldAvoidRename(FileSinkDesc conf, Configuration hConf) {
     // we are avoiding rename/move only if following conditions are met
     //  * execution engine is tez
-    //  * query cache is disabled
     //  * if it is select query
     if (conf != null && conf.getIsQuery() && conf.getFilesToFetch() != null
         && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez")){
@@ -1394,10 +1431,11 @@ public final class Utilities {
     //       3) Rename/move the temp directory to specPath
 
     FileSystem fs = specPath.getFileSystem(hconf);
+    boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs);
     Path tmpPath = Utilities.toTempPath(specPath);
     Path taskTmpPath = Utilities.toTaskTempPath(specPath);
     if (success) {
-      if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath)) {
+      if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath) && !isBlobStorage) {
         //   1) Rename tmpPath to a new directory name to prevent additional files
         //      from being added by runaway processes.
         Path tmpPathOriginal = tmpPath;
@@ -1435,6 +1473,8 @@ public final class Utilities {
         if(shouldAvoidRename(conf, hconf)){
           LOG.debug("Skipping rename/move files. Files to be kept are: " + filesKept.toString());
           conf.getFilesToFetch().addAll(filesKept);
+        } else if (isBlobStorage) {
+          Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept);
         } else {
           perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles");
           Utilities.renameOrMoveFiles(fs, tmpPath, specPath);