You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ra...@apache.org on 2023/01/03 18:19:08 UTC
[hive] branch master updated: HIVE-23559: Optimise Hive::moveAcidFiles for cloud storage. (#3795)
This is an automated email from the ASF dual-hosted git repository.
rameshkumar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 67906e855da HIVE-23559: Optimise Hive::moveAcidFiles for cloud storage. (#3795)
67906e855da is described below
commit 67906e855da46835fe3c4bce22378670efc7b411
Author: Dmitriy Fingerman <dm...@gmail.com>
AuthorDate: Tue Jan 3 13:18:56 2023 -0500
HIVE-23559: Optimise Hive::moveAcidFiles for cloud storage. (#3795)
Co-authored-by: Dmitriy Fingerman <df...@cloudera.com>
---
.../org/apache/hadoop/hive/ql/exec/MoveTask.java | 2 +-
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 156 ++++++++++++++-------
2 files changed, 105 insertions(+), 53 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index a5a5ea793d6..555bd7fb5b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -388,7 +388,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
FileSystem srcFs = sourcePath.getFileSystem(conf);
FileStatus[] srcs = srcFs.globStatus(sourcePath);
if(srcs != null) {
- Hive.moveAcidFiles(srcFs, srcs, targetPath, null);
+ Hive.moveAcidFiles(srcFs, srcs, targetPath, null, conf);
} else {
LOG.debug("No files found to move from " + sourcePath + " to " + targetPath);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 238b24d2641..d9301c3ea4b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -5102,7 +5102,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
// If we're moving files around for an ACID write then the rules and paths are all different.
// You can blame this on Owen.
if (isAcidIUD) {
- moveAcidFiles(srcFs, srcs, destf, newFiles);
+ moveAcidFiles(srcFs, srcs, destf, newFiles, conf);
} else {
// For ACID non-bucketed case, the filenames have to be in the format consistent with INSERT/UPDATE/DELETE Ops,
// i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc.
@@ -5124,7 +5124,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst,
- List<Path> newFiles) throws HiveException {
+ List<Path> newFiles, HiveConf conf) throws HiveException {
// The layout for ACID files is table|partname/base|delta|delete_delta/bucket
// We will always only be writing delta files ( except IOW which writes base_X/ ).
// In the buckets created by FileSinkOperator
@@ -5185,78 +5185,130 @@ private void constructOneLBLocationMap(FileStatus fSta,
for (FileStatus origBucketStat : origBucketStats) {
Path origBucketPath = origBucketStat.getPath();
moveAcidFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter,
- fs, dst, origBucketPath, createdDeltaDirs, newFiles);
+ fs, dst, origBucketPath, createdDeltaDirs, newFiles, conf);
moveAcidFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter,
- fs, dst,origBucketPath, createdDeltaDirs, newFiles);
+ fs, dst,origBucketPath, createdDeltaDirs, newFiles, conf);
moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter,//for Insert Overwrite
- fs, dst, origBucketPath, createdDeltaDirs, newFiles);
+ fs, dst, origBucketPath, createdDeltaDirs, newFiles, conf);
}
}
}
private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, FileSystem fs,
Path dst, Path origBucketPath, Set<Path> createdDeltaDirs,
- List<Path> newFiles) throws HiveException {
- LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath);
+ List<Path> newFiles, HiveConf conf) throws HiveException {
- FileStatus[] deltaStats = null;
- try {
- deltaStats = fs.listStatus(origBucketPath, pathFilter);
- } catch (IOException e) {
- throw new HiveException("Unable to look for " + deltaFileType + " files in original bucket " +
- origBucketPath.toUri().toString(), e);
- }
- LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files");
-
- for (FileStatus deltaStat : deltaStats) {
- Path deltaPath = deltaStat.getPath();
- // Create the delta directory. Don't worry if it already exists,
- // as that likely means another task got to it first. Then move each of the buckets.
- // it would be more efficient to try to move the delta with it's buckets but that is
- // harder to make race condition proof.
- Path deltaDest = new Path(dst, deltaPath.getName());
+ try{
+ LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath);
+
+ FileStatus[] deltaStats = null;
try {
- if (!createdDeltaDirs.contains(deltaDest)) {
- try {
- if(fs.mkdirs(deltaDest)) {
+ deltaStats = fs.listStatus(origBucketPath, pathFilter);
+ } catch (IOException e) {
+ throw new HiveException("Unable to look for " + deltaFileType + " files in original bucket " +
+ origBucketPath.toUri().toString(), e);
+ }
+ LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files");
+
+ List<Future<Void>> futures = new LinkedList<>();
+ final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+ Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Acid-Files-Thread-%d").build()) : null;
+
+ Set<Path> createdDeltaDirsSync = Collections.synchronizedSet(createdDeltaDirs);
+
+ for (FileStatus deltaStat : deltaStats) {
+
+ if (null == pool) {
+ moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirsSync, newFiles, deltaStat);
+ } else {
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws HiveException {
try {
- fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
- AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
- } catch (FileNotFoundException fnf) {
- // There might be no side file. Skip in this case.
+ moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirsSync, newFiles, deltaStat);
+ } catch (Exception e) {
+ final String poolMsg =
+ "Unable to move source " + deltaStat.getPath().getName() + " to destination " + dst.getName();
+ throw getHiveException(e, poolMsg);
}
+ return null;
}
- createdDeltaDirs.add(deltaDest);
- } catch (IOException swallowIt) {
- // Don't worry about this, as it likely just means it's already been created.
- LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest +
- ", assuming it already exists: " + swallowIt.getMessage());
- }
+ }));
}
- FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
- LOG.debug("Acid move found " + bucketStats.length + " bucket files");
- for (FileStatus bucketStat : bucketStats) {
- Path bucketSrc = bucketStat.getPath();
- Path bucketDest = new Path(deltaDest, bucketSrc.getName());
- final String msg = "Unable to move source " + bucketSrc + " to destination " +
- bucketDest;
- LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
- bucketDest.toUri().toString());
+ }
+
+ if (null != pool) {
+ pool.shutdown();
+ for (Future<Void> future : futures) {
try {
- fs.rename(bucketSrc, bucketDest);
- if (newFiles != null) {
- newFiles.add(bucketDest);
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ pool.shutdownNow();
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
}
- } catch (Exception e) {
- throw getHiveException(e, msg);
+ if (e.getCause() instanceof HiveException) {
+ throw (HiveException) e.getCause();
+ }
+ throw handlePoolException(pool, e);
}
}
- } catch (IOException e) {
- throw new HiveException("Error moving acid files " + e.getMessage(), e);
}
}
+ catch (IOException e) {
+ throw new HiveException(e.getMessage(), e);
+ }
}
+ private static void moveAcidFilesForDelta(String deltaFileType, FileSystem fs,
+ Path dst, Set<Path> createdDeltaDirs,
+ List<Path> newFiles, FileStatus deltaStat) throws HiveException {
+
+ Path deltaPath = deltaStat.getPath();
+ // Create the delta directory. Don't worry if it already exists,
+ // as that likely means another task got to it first. Then move each of the buckets.
+ // it would be more efficient to try to move the delta with it's buckets but that is
+ // harder to make race condition proof.
+ Path deltaDest = new Path(dst, deltaPath.getName());
+ try {
+ if (!createdDeltaDirs.contains(deltaDest)) {
+ try {
+ if(fs.mkdirs(deltaDest)) {
+ try {
+ fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
+ AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
+ } catch (FileNotFoundException fnf) {
+ // There might be no side file. Skip in this case.
+ }
+ }
+ createdDeltaDirs.add(deltaDest);
+ } catch (IOException swallowIt) {
+ // Don't worry about this, as it likely just means it's already been created.
+ LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest +
+ ", assuming it already exists: " + swallowIt.getMessage());
+ }
+ }
+ FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
+ LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+ for (FileStatus bucketStat : bucketStats) {
+ Path bucketSrc = bucketStat.getPath();
+ Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+ LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
+ bucketDest.toUri().toString());
+ try {
+ fs.rename(bucketSrc, bucketDest);
+ if (newFiles != null) {
+ newFiles.add(bucketDest);
+ }
+ } catch (Exception e) {
+ throw getHiveException(e, "Unable to move source " + bucketSrc + " to destination " + bucketDest);
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException("Error moving acid files " + e.getMessage(), e);
+ }
+ }
/**
* Replaces files in the partition with new data set specified by srcf. Works
* by renaming directory of srcf to the destination file.