You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/12/05 21:51:00 UTC

[jira] [Work logged] (HIVE-23559) Optimise Hive::moveAcidFiles for cloud storage

     [ https://issues.apache.org/jira/browse/HIVE-23559?focusedWorklogId=831200&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-831200 ]

ASF GitHub Bot logged work on HIVE-23559:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Dec/22 21:50
            Start Date: 05/Dec/22 21:50
    Worklog Time Spent: 10m 
      Work Description: ramesh0201 commented on code in PR #3795:
URL: https://github.com/apache/hive/pull/3795#discussion_r1040133418


##########
ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java:
##########
@@ -5208,55 +5208,94 @@ private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, F
     }
     LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files");
 
+    List<Future<Void>> futures = new LinkedList<>();
+    final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+            Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
+                    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Acid-Files-Thread-%d").build()) : null;
+
     for (FileStatus deltaStat : deltaStats) {
-      Path deltaPath = deltaStat.getPath();
-      // Create the delta directory.  Don't worry if it already exists,
-      // as that likely means another task got to it first.  Then move each of the buckets.
-      // it would be more efficient to try to move the delta with it's buckets but that is
-      // harder to make race condition proof.
-      Path deltaDest = new Path(dst, deltaPath.getName());
-      try {
-        if (!createdDeltaDirs.contains(deltaDest)) {
-          try {
-            if(fs.mkdirs(deltaDest)) {
-              try {
-                fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
-                    AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
-              } catch (FileNotFoundException fnf) {
-                // There might be no side file. Skip in this case.
-              }
+
+      if (null == pool) {
+        moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs, newFiles, deltaStat);
+      } else {
+        futures.add(pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws HiveException {
+            try {
+              moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs, newFiles, deltaStat);
+            } catch (Exception e) {
+              final String poolMsg =
+                      "Unable to move source " + deltaStat.getPath().getName() + " to destination " + dst.getName();
+              throw getHiveException(e, poolMsg);
             }
-            createdDeltaDirs.add(deltaDest);
-          } catch (IOException swallowIt) {
-            // Don't worry about this, as it likely just means it's already been created.
-            LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest +
-                ", assuming it already exists: " + swallowIt.getMessage());
+            return null;
           }
+        }));
+      }
+    }

Review Comment:
   I think we need to handle the thread interruption. We might need to cancel the running futures and and interrupt the current thread.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 831200)
    Time Spent: 0.5h  (was: 20m)

> Optimise Hive::moveAcidFiles for cloud storage
> ----------------------------------------------
>
>                 Key: HIVE-23559
>                 URL: https://issues.apache.org/jira/browse/HIVE-23559
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Dmitriy Fingerman
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L4752]
> It ends up transferring DELTA, DELETE_DELTA, BASE prefixes sequentially from staging to final location.
> This causes delays even with simple updates statements, which updates smaller number of records in cloud storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)