You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ra...@apache.org on 2023/01/03 18:19:08 UTC

[hive] branch master updated: HIVE-23559: Optimise Hive::moveAcidFiles for cloud storage. (#3795)

This is an automated email from the ASF dual-hosted git repository.

rameshkumar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 67906e855da HIVE-23559: Optimise Hive::moveAcidFiles for cloud storage. (#3795)
67906e855da is described below

commit 67906e855da46835fe3c4bce22378670efc7b411
Author: Dmitriy Fingerman <dm...@gmail.com>
AuthorDate: Tue Jan 3 13:18:56 2023 -0500

    HIVE-23559: Optimise Hive::moveAcidFiles for cloud storage. (#3795)
    
    Co-authored-by: Dmitriy Fingerman <df...@cloudera.com>
---
 .../org/apache/hadoop/hive/ql/exec/MoveTask.java   |   2 +-
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   | 156 ++++++++++++++-------
 2 files changed, 105 insertions(+), 53 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index a5a5ea793d6..555bd7fb5b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -388,7 +388,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
             FileSystem srcFs = sourcePath.getFileSystem(conf);
             FileStatus[] srcs = srcFs.globStatus(sourcePath);
             if(srcs != null) {
-              Hive.moveAcidFiles(srcFs, srcs, targetPath, null);
+              Hive.moveAcidFiles(srcFs, srcs, targetPath, null, conf);
             } else {
               LOG.debug("No files found to move from " + sourcePath + " to " + targetPath);
             }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 238b24d2641..d9301c3ea4b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -5102,7 +5102,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     // If we're moving files around for an ACID write then the rules and paths are all different.
     // You can blame this on Owen.
     if (isAcidIUD) {
-      moveAcidFiles(srcFs, srcs, destf, newFiles);
+      moveAcidFiles(srcFs, srcs, destf, newFiles, conf);
     } else {
       // For ACID non-bucketed case, the filenames have to be in the format consistent with INSERT/UPDATE/DELETE Ops,
       // i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc.
@@ -5124,7 +5124,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
   }
 
   public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst,
-                                    List<Path> newFiles) throws HiveException {
+                                    List<Path> newFiles, HiveConf conf) throws HiveException {
     // The layout for ACID files is table|partname/base|delta|delete_delta/bucket
     // We will always only be writing delta files ( except IOW which writes base_X/ ).
     // In the buckets created by FileSinkOperator
@@ -5185,78 +5185,130 @@ private void constructOneLBLocationMap(FileStatus fSta,
       for (FileStatus origBucketStat : origBucketStats) {
         Path origBucketPath = origBucketStat.getPath();
         moveAcidFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter,
-                fs, dst, origBucketPath, createdDeltaDirs, newFiles);
+                fs, dst, origBucketPath, createdDeltaDirs, newFiles, conf);
         moveAcidFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter,
-                fs, dst,origBucketPath, createdDeltaDirs, newFiles);
+                fs, dst,origBucketPath, createdDeltaDirs, newFiles, conf);
         moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter,//for Insert Overwrite
-                fs, dst, origBucketPath, createdDeltaDirs, newFiles);
+                fs, dst, origBucketPath, createdDeltaDirs, newFiles, conf);
       }
     }
   }
 
   private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, FileSystem fs,
                                     Path dst, Path origBucketPath, Set<Path> createdDeltaDirs,
-                                    List<Path> newFiles) throws HiveException {
-    LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath);
+                                    List<Path> newFiles, HiveConf conf) throws HiveException {
 
-    FileStatus[] deltaStats = null;
-    try {
-      deltaStats = fs.listStatus(origBucketPath, pathFilter);
-    } catch (IOException e) {
-      throw new HiveException("Unable to look for " + deltaFileType + " files in original bucket " +
-          origBucketPath.toUri().toString(), e);
-    }
-    LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files");
-
-    for (FileStatus deltaStat : deltaStats) {
-      Path deltaPath = deltaStat.getPath();
-      // Create the delta directory.  Don't worry if it already exists,
-      // as that likely means another task got to it first.  Then move each of the buckets.
-      // it would be more efficient to try to move the delta with it's buckets but that is
-      // harder to make race condition proof.
-      Path deltaDest = new Path(dst, deltaPath.getName());
+    try{
+      LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath);
+
+      FileStatus[] deltaStats = null;
       try {
-        if (!createdDeltaDirs.contains(deltaDest)) {
-          try {
-            if(fs.mkdirs(deltaDest)) {
+        deltaStats = fs.listStatus(origBucketPath, pathFilter);
+      } catch (IOException e) {
+        throw new HiveException("Unable to look for " + deltaFileType + " files in original bucket " +
+                origBucketPath.toUri().toString(), e);
+      }
+      LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files");
+
+      List<Future<Void>> futures = new LinkedList<>();
+      final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+              Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
+                      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Acid-Files-Thread-%d").build()) : null;
+
+      Set<Path> createdDeltaDirsSync = Collections.synchronizedSet(createdDeltaDirs);
+      
+      for (FileStatus deltaStat : deltaStats) {
+
+        if (null == pool) {
+          moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirsSync, newFiles, deltaStat);
+        } else {
+          futures.add(pool.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws HiveException {
               try {
-                fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
-                    AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
-              } catch (FileNotFoundException fnf) {
-                // There might be no side file. Skip in this case.
+                moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirsSync, newFiles, deltaStat);
+              } catch (Exception e) {
+                final String poolMsg =
+                        "Unable to move source " + deltaStat.getPath().getName() + " to destination " + dst.getName();
+                throw getHiveException(e, poolMsg);
               }
+              return null;
             }
-            createdDeltaDirs.add(deltaDest);
-          } catch (IOException swallowIt) {
-            // Don't worry about this, as it likely just means it's already been created.
-            LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest +
-                ", assuming it already exists: " + swallowIt.getMessage());
-          }
+          }));
         }
-        FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
-        LOG.debug("Acid move found " + bucketStats.length + " bucket files");
-        for (FileStatus bucketStat : bucketStats) {
-          Path bucketSrc = bucketStat.getPath();
-          Path bucketDest = new Path(deltaDest, bucketSrc.getName());
-          final String msg = "Unable to move source " + bucketSrc + " to destination " +
-              bucketDest;
-          LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
-              bucketDest.toUri().toString());
+      }
+
+      if (null != pool) {
+        pool.shutdown();
+        for (Future<Void> future : futures) {
           try {
-            fs.rename(bucketSrc, bucketDest);
-            if (newFiles != null) {
-              newFiles.add(bucketDest);
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            pool.shutdownNow();
+            if (e.getCause() instanceof IOException) {
+              throw (IOException) e.getCause();
             }
-          } catch (Exception e) {
-            throw getHiveException(e, msg);
+            if (e.getCause() instanceof HiveException) {
+              throw (HiveException) e.getCause();
+            }
+            throw handlePoolException(pool, e);
           }
         }
-      } catch (IOException e) {
-        throw new HiveException("Error moving acid files " + e.getMessage(), e);
       }
     }
+    catch (IOException e) {
+      throw new HiveException(e.getMessage(), e);
+    }
   }
 
+  private static void moveAcidFilesForDelta(String deltaFileType, FileSystem fs,
+                                            Path dst, Set<Path> createdDeltaDirs,
+                                            List<Path> newFiles, FileStatus deltaStat) throws HiveException {
+
+    Path deltaPath = deltaStat.getPath();
+    // Create the delta directory.  Don't worry if it already exists,
+    // as that likely means another task got to it first.  Then move each of the buckets.
+    // it would be more efficient to try to move the delta with it's buckets but that is
+    // harder to make race condition proof.
+    Path deltaDest = new Path(dst, deltaPath.getName());
+    try {
+      if (!createdDeltaDirs.contains(deltaDest)) {
+        try {
+          if(fs.mkdirs(deltaDest)) {
+            try {
+              fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
+                      AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
+            } catch (FileNotFoundException fnf) {
+              // There might be no side file. Skip in this case.
+            }
+          }
+          createdDeltaDirs.add(deltaDest);
+        } catch (IOException swallowIt) {
+          // Don't worry about this, as it likely just means it's already been created.
+          LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest +
+                  ", assuming it already exists: " + swallowIt.getMessage());
+        }
+      }
+      FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
+      LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+      for (FileStatus bucketStat : bucketStats) {
+        Path bucketSrc = bucketStat.getPath();
+        Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+        LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
+                bucketDest.toUri().toString());
+        try {
+          fs.rename(bucketSrc, bucketDest);
+          if (newFiles != null) {
+            newFiles.add(bucketDest);
+          }
+        } catch (Exception e) {
+          throw getHiveException(e, "Unable to move source " + bucketSrc + " to destination " + bucketDest);
+        }
+      }
+    } catch (IOException e) {
+      throw new HiveException("Error moving acid files " + e.getMessage(), e);
+    }
+  }
   /**
    * Replaces files in the partition with new data set specified by srcf. Works
    * by renaming directory of srcf to the destination file.