You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/12/05 21:51:00 UTC
[jira] [Work logged] (HIVE-23559) Optimise Hive::moveAcidFiles for cloud storage
[ https://issues.apache.org/jira/browse/HIVE-23559?focusedWorklogId=831200&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-831200 ]
ASF GitHub Bot logged work on HIVE-23559:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Dec/22 21:50
Start Date: 05/Dec/22 21:50
Worklog Time Spent: 10m
Work Description: ramesh0201 commented on code in PR #3795:
URL: https://github.com/apache/hive/pull/3795#discussion_r1040133418
##########
ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java:
##########
@@ -5208,55 +5208,94 @@ private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, F
}
LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files");
+ List<Future<Void>> futures = new LinkedList<>();
+ final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+ Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Acid-Files-Thread-%d").build()) : null;
+
for (FileStatus deltaStat : deltaStats) {
- Path deltaPath = deltaStat.getPath();
- // Create the delta directory. Don't worry if it already exists,
- // as that likely means another task got to it first. Then move each of the buckets.
- // it would be more efficient to try to move the delta with it's buckets but that is
- // harder to make race condition proof.
- Path deltaDest = new Path(dst, deltaPath.getName());
- try {
- if (!createdDeltaDirs.contains(deltaDest)) {
- try {
- if(fs.mkdirs(deltaDest)) {
- try {
- fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
- AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
- } catch (FileNotFoundException fnf) {
- // There might be no side file. Skip in this case.
- }
+
+ if (null == pool) {
+ moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs, newFiles, deltaStat);
+ } else {
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws HiveException {
+ try {
+ moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs, newFiles, deltaStat);
+ } catch (Exception e) {
+ final String poolMsg =
+ "Unable to move source " + deltaStat.getPath().getName() + " to destination " + dst.getName();
+ throw getHiveException(e, poolMsg);
}
- createdDeltaDirs.add(deltaDest);
- } catch (IOException swallowIt) {
- // Don't worry about this, as it likely just means it's already been created.
- LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest +
- ", assuming it already exists: " + swallowIt.getMessage());
+ return null;
}
+ }));
+ }
+ }
Review Comment:
I think we need to handle the thread interruption. We might need to cancel the running futures and and interrupt the current thread.
Issue Time Tracking
-------------------
Worklog Id: (was: 831200)
Time Spent: 0.5h (was: 20m)
> Optimise Hive::moveAcidFiles for cloud storage
> ----------------------------------------------
>
> Key: HIVE-23559
> URL: https://issues.apache.org/jira/browse/HIVE-23559
> Project: Hive
> Issue Type: Improvement
> Reporter: Rajesh Balamohan
> Assignee: Dmitriy Fingerman
> Priority: Major
> Labels: pull-request-available
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L4752]
> It ends up transferring DELTA, DELETE_DELTA, BASE prefixes sequentially from staging to final location.
> This causes delays even with simple updates statements, which updates smaller number of records in cloud storage.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)