You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2020/04/10 05:52:32 UTC
[hive] branch master updated: HIVE-23154: Fix race condition in
Utilities::mvFileToFinalPath (Rajesh Balamohan reviewed by Ashutosh
Chauhan)
This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f723f42 HIVE-23154: Fix race condition in Utilities::mvFileToFinalPath (Rajesh Balamohan reviewed by Ashutosh Chauhan)
f723f42 is described below
commit f723f42d13865d820099c66d2fac68195f46110b
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Fri Apr 10 11:22:17 2020 +0530
HIVE-23154: Fix race condition in Utilities::mvFileToFinalPath (Rajesh Balamohan reviewed by Ashutosh Chauhan)
---
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 205 ++++++++++++++++-----
1 file changed, 161 insertions(+), 44 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index e25dc54..0304a5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1111,43 +1111,6 @@ public final class Utilities {
}
}
- /**
- * Moves files from src to dst if it is within the specified set of paths
- * @param fs
- * @param src
- * @param dst
- * @param filesToMove
- * @throws IOException
- * @throws HiveException
- */
- private static void moveSpecifiedFiles(FileSystem fs, Path src, Path dst, Set<Path> filesToMove)
- throws IOException, HiveException {
- if (!fs.exists(dst)) {
- fs.mkdirs(dst);
- }
-
- FileStatus[] files = fs.listStatus(src);
- for (FileStatus file : files) {
- if (filesToMove.contains(file.getPath())) {
- Utilities.moveFile(fs, file, dst);
- } else if (file.isDir()) {
- // Traverse directory contents.
- // Directory nesting for dst needs to match src.
- Path nestedDstPath = new Path(dst, file.getPath().getName());
- Utilities.moveSpecifiedFiles(fs, file.getPath(), nestedDstPath, filesToMove);
- }
- }
- }
-
- private static void moveSpecifiedFileStatus(FileSystem fs, Path src, Path dst,
- Set<FileStatus> filesToMove) throws IOException, HiveException {
- Set<Path> filePaths = new HashSet<>();
- for (FileStatus fstatus : filesToMove) {
- filePaths.add(fstatus.getPath());
- }
- moveSpecifiedFiles(fs, src, dst, filePaths);
- }
-
private static void moveFile(FileSystem fs, FileStatus file, Path dst) throws IOException,
HiveException {
Path srcFilePath = file.getPath();
@@ -1198,6 +1161,53 @@ public final class Utilities {
}
/**
+ * Rename src to dst, or in the case dst already exists, move files in src
+ * to dst. If there is an existing file with the same name, the new file's
+ * name will be appended with "_1", "_2", etc. Happens in parallel mode.
+ *
+ * @param conf
+ *
+ * @param fs
+ * the FileSystem where src and dst are on.
+ * @param src
+ * the src directory
+ * @param dst
+ * the target directory
+ * @throws IOException
+ */
+ public static void renameOrMoveFilesInParallel(Configuration conf,
+ FileSystem fs, Path src, Path dst) throws IOException, HiveException {
+ if (!fs.exists(dst)) {
+ if (!fs.rename(src, dst)) {
+ throw new HiveException("Unable to move: " + src + " to: " + dst);
+ }
+ } else {
+ // move files in parallel
+ LOG.info("Moving files from {} to {}", src, dst);
+ final ExecutorService pool = createMoveThreadPool(conf);
+ List<Future<Void>> futures = new LinkedList<>();
+
+ final FileStatus[] files = fs.listStatus(src);
+ for (FileStatus file : files) {
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws HiveException {
+ try {
+ Utilities.moveFile(fs, file, dst);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ return null;
+ }
+ }));
+ }
+
+ shutdownAndCleanup(pool, futures);
+ LOG.info("Rename files from {} to {} is complete", src, dst);
+ }
+ }
+
+ /**
* The first group will contain the task id. The second group is the optional extension. The file
* name looks like: "0_0" or "0_0.gz". There may be a leading prefix (tmp_). Since getTaskId() can
* return an integer only - this should match a pure integer as well. {1,6} is used to limit
@@ -1497,18 +1507,13 @@ public final class Utilities {
// for CTAS or Create MV statements
perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus");
LOG.debug("CTAS/Create MV: Files being renamed: " + filesKept.toString());
- if (emptyBuckets.isEmpty()) {
- fs.rename(tmpPath, specPath);
- } else {
- LOG.info("Duplicate files present. Moving files sequentially");
- Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept);
- }
+ moveSpecifiedFilesInParallel(hconf, fs, tmpPath, specPath, filesKept);
perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus");
} else {
// for rest of the statement e.g. INSERT, LOAD etc
perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles");
LOG.debug("Final renaming/moving. Source: " + tmpPath + " .Destination: " + specPath);
- Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
+ renameOrMoveFilesInParallel(hconf, fs, tmpPath, specPath);
perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles");
}
}
@@ -1520,6 +1525,118 @@ public final class Utilities {
fs.delete(taskTmpPath, true);
}
+ /**
+ * move specified files to destination in parallel mode.
+ * Spins up multiple threads, schedules transfer and shuts down the pool.
+ *
+ * @param conf
+ * @param fs
+ * @param srcPath
+ * @param destPath
+ * @param filesToMove
+ * @throws HiveException
+ * @throws IOException
+ */
+ private static void moveSpecifiedFilesInParallel(Configuration conf, FileSystem fs,
+ Path srcPath, Path destPath, Set<FileStatus> filesToMove)
+ throws HiveException, IOException {
+
+ LOG.info("rename {} files from {} to dest {}",
+ filesToMove.size(), srcPath, destPath);
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus");
+
+ final ExecutorService pool = createMoveThreadPool(conf);
+
+ List<Future<Void>> futures = new LinkedList<>();
+ moveSpecifiedFilesInParallel(fs, srcPath, destPath, filesToMove, futures, pool);
+
+ shutdownAndCleanup(pool, futures);
+ LOG.info("Completed rename from {} to {}", srcPath, destPath);
+
+ perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus");
+ }
+
+ /**
+ * Moves files from src to dst if it is within the specified set of paths
+ * @param fs
+ * @param src
+ * @param dst
+ * @param filesToMove
+ * @param futures List of futures
+ * @param pool thread pool
+ * @throws IOException
+ */
+ private static void moveSpecifiedFilesInParallel(FileSystem fs,
+ Path src, Path dst, Set<FileStatus> filesToMove, List<Future<Void>> futures,
+ ExecutorService pool) throws IOException {
+ if (!fs.exists(dst)) {
+ LOG.info("Creating {}", dst);
+ fs.mkdirs(dst);
+ }
+
+ FileStatus[] files = fs.listStatus(src);
+ for (FileStatus fileStatus : files) {
+ if (filesToMove.contains(fileStatus)) {
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws HiveException {
+ try {
+ LOG.debug("Moving from {} to {} ", fileStatus.getPath(), dst);
+ Utilities.moveFile(fs, fileStatus, dst);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ return null;
+ }
+ }));
+ } else if (fileStatus.isDir()) {
+ // Traverse directory contents.
+ // Directory nesting for dst needs to match src.
+ Path nestedDstPath = new Path(dst, fileStatus.getPath().getName());
+ moveSpecifiedFilesInParallel(fs, fileStatus.getPath(), nestedDstPath,
+ filesToMove, futures, pool);
+ }
+ }
+ }
+
+ private static ExecutorService createMoveThreadPool(Configuration conf) {
+ int threads = Math.max(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15), 1);
+ return Executors.newFixedThreadPool(threads,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build());
+ }
+
+ private static void shutdownAndCleanup(ExecutorService pool,
+ List<Future<Void>> futures) throws HiveException {
+ if (pool == null) {
+ return;
+ }
+ pool.shutdown();
+
+ futures = (futures != null) ? futures : Collections.emptyList();
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error in moving files to destination", e);
+ cancelTasks(futures);
+ throw new HiveException(e);
+ }
+ }
+ }
+
+ /**
+ * cancel all futures.
+ *
+ * @param futureList
+ */
+ private static void cancelTasks(List<Future<Void>> futureList) {
+ for (Future future : futureList) {
+ future.cancel(true);
+ }
+ }
+
+
/**
* Check the existence of buckets according to bucket specification. Create empty buckets if