You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2020/04/10 05:52:32 UTC

[hive] branch master updated: HIVE-23154: Fix race condition in Utilities::mvFileToFinalPath (Rajesh Balamohan reviewed by Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f723f42  HIVE-23154: Fix race condition in Utilities::mvFileToFinalPath (Rajesh Balamohan reviewed by Ashutosh Chauhan)
f723f42 is described below

commit f723f42d13865d820099c66d2fac68195f46110b
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Fri Apr 10 11:22:17 2020 +0530

    HIVE-23154: Fix race condition in Utilities::mvFileToFinalPath (Rajesh Balamohan reviewed by Ashutosh Chauhan)
---
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  | 205 ++++++++++++++++-----
 1 file changed, 161 insertions(+), 44 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index e25dc54..0304a5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1111,43 +1111,6 @@ public final class Utilities {
     }
   }
 
-  /**
-   * Moves files from src to dst if it is within the specified set of paths
-   * @param fs
-   * @param src
-   * @param dst
-   * @param filesToMove
-   * @throws IOException
-   * @throws HiveException
-   */
-  private static void moveSpecifiedFiles(FileSystem fs, Path src, Path dst, Set<Path> filesToMove)
-      throws IOException, HiveException {
-    if (!fs.exists(dst)) {
-      fs.mkdirs(dst);
-    }
-
-    FileStatus[] files = fs.listStatus(src);
-    for (FileStatus file : files) {
-      if (filesToMove.contains(file.getPath())) {
-        Utilities.moveFile(fs, file, dst);
-      } else if (file.isDir()) {
-        // Traverse directory contents.
-        // Directory nesting for dst needs to match src.
-        Path nestedDstPath = new Path(dst, file.getPath().getName());
-        Utilities.moveSpecifiedFiles(fs, file.getPath(), nestedDstPath, filesToMove);
-      }
-    }
-  }
-
-  private static void moveSpecifiedFileStatus(FileSystem fs, Path src, Path dst,
-      Set<FileStatus> filesToMove) throws IOException, HiveException {
-    Set<Path> filePaths = new HashSet<>();
-    for (FileStatus fstatus : filesToMove) {
-      filePaths.add(fstatus.getPath());
-    }
-    moveSpecifiedFiles(fs, src, dst, filePaths);
-  }
-
   private static void moveFile(FileSystem fs, FileStatus file, Path dst) throws IOException,
       HiveException {
     Path srcFilePath = file.getPath();
@@ -1198,6 +1161,53 @@ public final class Utilities {
   }
 
   /**
+   * Rename src to dst, or in the case dst already exists, move files in src
+   * to dst. If there is an existing file with the same name, the new file's
+   * name will be appended with "_1", "_2", etc. Happens in parallel mode.
+   *
+   * @param conf
+   *
+   * @param fs
+   *          the FileSystem where src and dst are on.
+   * @param src
+   *          the src directory
+   * @param dst
+   *          the target directory
+   * @throws IOException
+   */
+  public static void renameOrMoveFilesInParallel(Configuration conf,
+      FileSystem fs, Path src, Path dst) throws IOException, HiveException {
+    if (!fs.exists(dst)) {
+      if (!fs.rename(src, dst)) {
+        throw new HiveException("Unable to move: " + src + " to: " + dst);
+      }
+    } else {
+      // move files in parallel
+      LOG.info("Moving files from {}  to {}", src, dst);
+      final ExecutorService pool = createMoveThreadPool(conf);
+      List<Future<Void>> futures = new LinkedList<>();
+
+      final FileStatus[] files = fs.listStatus(src);
+      for (FileStatus file : files) {
+        futures.add(pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws HiveException {
+            try {
+              Utilities.moveFile(fs, file, dst);
+            } catch (Exception e) {
+              throw new HiveException(e);
+            }
+            return null;
+          }
+        }));
+      }
+
+      shutdownAndCleanup(pool, futures);
+      LOG.info("Rename files from {}  to {} is complete", src, dst);
+    }
+  }
+
+  /**
    * The first group will contain the task id. The second group is the optional extension. The file
    * name looks like: "0_0" or "0_0.gz". There may be a leading prefix (tmp_). Since getTaskId() can
    * return an integer only - this should match a pure integer as well. {1,6} is used to limit
@@ -1497,18 +1507,13 @@ public final class Utilities {
           // for CTAS or Create MV statements
           perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus");
           LOG.debug("CTAS/Create MV: Files being renamed:  " + filesKept.toString());
-          if (emptyBuckets.isEmpty()) {
-            fs.rename(tmpPath, specPath);
-          } else {
-            LOG.info("Duplicate files present. Moving files sequentially");
-            Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept);
-          }
+          moveSpecifiedFilesInParallel(hconf, fs, tmpPath, specPath, filesKept);
           perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus");
         } else {
           // for rest of the statement e.g. INSERT, LOAD etc
           perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles");
           LOG.debug("Final renaming/moving. Source: " + tmpPath + " .Destination: " + specPath);
-          Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
+          renameOrMoveFilesInParallel(hconf, fs, tmpPath, specPath);
           perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles");
         }
       }
@@ -1520,6 +1525,118 @@ public final class Utilities {
     fs.delete(taskTmpPath, true);
   }
 
+  /**
+   * move specified files to destination in parallel mode.
+   * Spins up multiple threads, schedules transfer and shuts down the pool.
+   *
+   * @param conf
+   * @param fs
+   * @param srcPath
+   * @param destPath
+   * @param filesToMove
+   * @throws HiveException
+   * @throws IOException
+   */
+  private static void moveSpecifiedFilesInParallel(Configuration conf, FileSystem fs,
+      Path srcPath, Path destPath, Set<FileStatus> filesToMove)
+      throws HiveException, IOException {
+
+    LOG.info("rename {} files from {} to dest {}",
+        filesToMove.size(), srcPath, destPath);
+    PerfLogger perfLogger = SessionState.getPerfLogger();
+    perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus");
+
+    final ExecutorService pool = createMoveThreadPool(conf);
+
+    List<Future<Void>> futures = new LinkedList<>();
+    moveSpecifiedFilesInParallel(fs, srcPath, destPath, filesToMove, futures, pool);
+
+    shutdownAndCleanup(pool, futures);
+    LOG.info("Completed rename from {} to {}", srcPath, destPath);
+
+    perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus");
+  }
+
+  /**
+   * Moves files from src to dst if it is within the specified set of paths
+   * @param fs
+   * @param src
+   * @param dst
+   * @param filesToMove
+   * @param futures List of futures
+   * @param pool thread pool
+   * @throws IOException
+   */
+  private static void moveSpecifiedFilesInParallel(FileSystem fs,
+      Path src, Path dst, Set<FileStatus> filesToMove, List<Future<Void>> futures,
+      ExecutorService pool) throws IOException {
+    if (!fs.exists(dst)) {
+      LOG.info("Creating {}", dst);
+      fs.mkdirs(dst);
+    }
+
+    FileStatus[] files = fs.listStatus(src);
+    for (FileStatus fileStatus : files) {
+      if (filesToMove.contains(fileStatus)) {
+        futures.add(pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws HiveException {
+            try {
+              LOG.debug("Moving from {} to {} ", fileStatus.getPath(), dst);
+              Utilities.moveFile(fs, fileStatus, dst);
+            } catch (Exception e) {
+              throw new HiveException(e);
+            }
+            return null;
+          }
+        }));
+      } else if (fileStatus.isDir()) {
+        // Traverse directory contents.
+        // Directory nesting for dst needs to match src.
+        Path nestedDstPath = new Path(dst, fileStatus.getPath().getName());
+        moveSpecifiedFilesInParallel(fs, fileStatus.getPath(), nestedDstPath,
+            filesToMove, futures, pool);
+      }
+    }
+  }
+
+  private static ExecutorService createMoveThreadPool(Configuration conf) {
+    int threads = Math.max(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15), 1);
+    return Executors.newFixedThreadPool(threads,
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build());
+  }
+
+  private static void shutdownAndCleanup(ExecutorService pool,
+      List<Future<Void>> futures) throws HiveException {
+    if (pool == null) {
+      return;
+    }
+    pool.shutdown();
+
+    futures = (futures != null) ? futures : Collections.emptyList();
+    for (Future<Void> future : futures) {
+      try {
+        future.get();
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Error in moving files to destination", e);
+        cancelTasks(futures);
+        throw new HiveException(e);
+      }
+    }
+  }
+
+  /**
+   * cancel all futures.
+   *
+   * @param futureList
+   */
+  private static void cancelTasks(List<Future<Void>> futureList) {
+    for (Future future : futureList) {
+      future.cancel(true);
+    }
+  }
+
+
 
   /**
    * Check the existence of buckets according to bucket specification. Create empty buckets if