You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2023/04/11 00:19:50 UTC

[hive] branch master updated: HIVE-27143: Optimize HCatStorer moveTask (#4177)

This is an automated email from the ASF dual-hosted git repository.

daijy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 43491dbd75b HIVE-27143: Optimize HCatStorer moveTask (#4177)
43491dbd75b is described below

commit 43491dbd75b83daa755438eb6f43cf6e6b47b1c1
Author: yigress <10...@users.noreply.github.com>
AuthorDate: Mon Apr 10 17:19:38 2023 -0700

    HIVE-27143: Optimize HCatStorer moveTask (#4177)
    
    * HIVE-27143: Optimize HCatStorer moveTask
    
    * fix custom dynamic partition
---
 .../mapreduce/FileOutputCommitterContainer.java    | 230 +++++++++++----------
 1 file changed, 123 insertions(+), 107 deletions(-)

diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
index ef3c1afc457..476c60e53af 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
@@ -19,23 +19,34 @@
 
 package org.apache.hive.hcatalog.mapreduce;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HdfsUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -225,6 +236,15 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
     }
   }
 
+  public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      String name = p.getName();
+      boolean filtered = name.equals(TEMP_DIR_NAME) || name.equals(LOGS_DIR_NAME) || name.equals(SUCCEEDED_FILE_NAME);
+      return !filtered;
+    }
+  };
+
   public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
   static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
     "mapreduce.fileoutputcommitter.marksuccessfuljobs";
@@ -367,10 +387,11 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
       partPath = new Path(finalLocn);
     } else {
       partPath = new Path(partLocnRoot);
+      FileSystem partFs = partPath.getFileSystem(context.getConfiguration());
       int i = 0;
       for (FieldSchema partKey : table.getPartitionKeys()) {
         if (i++ != 0) {
-          fs.mkdirs(partPath); // Attempt to make the path in case it does not exist before we check
+          partFs.mkdirs(partPath); // Attempt to make the path in case it does not exist before we check
           HdfsUtils.setFullFileStatus(conf, status, status.getFileStatus().getGroup(), fs,
               partPath, false);
         }
@@ -380,7 +401,8 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
 
     // Do not need to set the status on the partition directory. We will do it later recursively.
     // See: end of the registerPartitions method
-    fs.mkdirs(partPath);
+    FileSystem partFs = partPath.getFileSystem(context.getConfiguration());
+    partFs.mkdirs(partPath);
 
     // Set the location in the StorageDescriptor
     if (dynamicPartitioningUsed) {
@@ -467,131 +489,129 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
 
   /**
    * Move all of the files from the temp directory to the final location
-   * @param fs the output file system
-   * @param file the file to move
+   * @param srcf the file to move
    * @param srcDir the source directory
    * @param destDir the target directory
-   * @param dryRun - a flag that simply tests if this move would succeed or not based
-   *                 on whether other files exist where we're trying to copy
+   * @param immutable - whether table is immutable.
    * @throws java.io.IOException
    */
-  private void moveTaskOutputs(FileSystem fs, Path file, Path srcDir,
-                 Path destDir, final boolean dryRun, boolean immutable
-      ) throws IOException {
+  private void moveTaskOutputs(final Configuration conf, Path srcf, Path srcDir,
+      Path destDir, boolean immutable) throws IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("moveTaskOutputs "
-          + file + " from: " + srcDir + " to: " + destDir
-          + " dry: " + dryRun + " immutable: " + immutable);
+          + srcf + " from: " + srcDir + " to: " + destDir + " immutable: " + immutable);
     }
 
     if (dynamicPartitioningUsed) {
       immutable = true; // Making sure we treat dynamic partitioning jobs as if they were immutable.
     }
 
-    if (file.getName().equals(TEMP_DIR_NAME) || file.getName().equals(LOGS_DIR_NAME) || file.getName().equals(SUCCEEDED_FILE_NAME)) {
-      return;
-    }
+    final FileSystem srcFs = srcf.getFileSystem(conf);
+    final FileSystem destFs = destDir.getFileSystem(conf);
+    final boolean canRename = srcFs.getUri().equals(destFs.getUri());
 
-    final Path finalOutputPath = getFinalPath(fs, file, srcDir, destDir, immutable);
-    FileStatus fileStatus = fs.getFileStatus(file);
+    if (destFs.exists(destDir) && !destFs.getFileStatus(destDir).isDirectory()) {
+      throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Destination is not directory " + destDir);
+    }
 
-    if (!fileStatus.isDir()) {
-      if (dryRun){
-        if (immutable){
-          // Dryrun checks are meaningless for mutable table - we should always succeed
-          // unless there is a runtime IOException.
-          LOG.debug("Testing if moving file: [{}] to [{}] would cause a problem", file, finalOutputPath);
-          if (fs.exists(finalOutputPath)) {
-            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in "
-                + finalOutputPath + ", duplicate publish not possible.");
-          }
-        }
-      } else {
-        LOG.debug("Moving file: [{}] to [{}]", file, finalOutputPath);
-        // Make sure the parent directory exists.  It is not an error
-        // to recreate an existing directory
-        fs.mkdirs(finalOutputPath.getParent());
-        if (!fs.rename(file, finalOutputPath)) {
-          if (!fs.delete(finalOutputPath, true)) {
-            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
-          }
-          if (!fs.rename(file, finalOutputPath)) {
-            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + finalOutputPath);
-          }
-        }
+    LinkedList<Pair<Path, Path>> moves = new LinkedList<>();
+    if (customDynamicLocationUsed) {
+      if (immutable && destFs.exists(destDir) &&
+          !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, destDir)) {
+        throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
+            "Data already exists in " + destDir
+                + ", duplicate publish not possible.");
       }
+      moves.add(Pair.of(srcf, destDir));
     } else {
-
-      FileStatus[] children = fs.listStatus(file);
-      FileStatus firstChild = null;
-      if (children != null) {
-        int index=0;
-        while (index < children.length) {
-          if ( !children[index].getPath().getName().equals(TEMP_DIR_NAME)
-              && !children[index].getPath().getName().equals(LOGS_DIR_NAME)
-              && !children[index].getPath().getName().equals(SUCCEEDED_FILE_NAME)) {
-            firstChild = children[index];
-            break;
-          }
-          index++;
-        }
+      Queue<FileStatus> srcQ = new LinkedList<>();
+      FileStatus[] contents = srcFs.listStatus(srcf, HIDDEN_FILES_PATH_FILTER);
+      if (contents.length == 0) {
+        // nothing to move
+        return;
       }
-      if(firstChild!=null && firstChild.isDir()) {
-        // If the first child is directory, then rest would be directory too according to HCatalog dir structure
-        // recurse in that case
-        for (FileStatus child : children) {
-          moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun, immutable);
+      Collections.addAll(srcQ, contents);
+
+      while (!srcQ.isEmpty()) {
+        FileStatus srcStatus = srcQ.remove();
+        Path srcF = srcStatus.getPath();
+        final Path finalOutputPath = getFinalPath(destFs, srcF, srcDir, destDir, immutable);
+        if (immutable && destFs.exists(finalOutputPath) &&
+            !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, finalOutputPath)) {
+          throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
+              "Data already exists in " + finalOutputPath
+                  + ", duplicate publish not possible.");
         }
-      } else {
-
-        if (!dryRun) {
-          if (dynamicPartitioningUsed) {
-
-            // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself
-            // instead of moving each file under the directory. See HCATALOG-538
-            // Note for future Append implementation : This optimization is another reason dynamic
-            // partitioning is currently incompatible with append on mutable tables.
-
-            final Path parentDir = finalOutputPath.getParent();
-            // Create the directory
-            Path placeholder = new Path(parentDir, "_placeholder" + String.valueOf(Math.random()));
-            if (fs.mkdirs(parentDir)) {
-              // It is weird but we need a placeholder,
-              // otherwise rename cannot move file to the right place
-              fs.create(placeholder).close();
-            }
-            LOG.debug("Moving directory: {} to {}", file, parentDir);
-
+        if (srcStatus.isDirectory()) {
+          if (canRename && dynamicPartitioningUsed) {
+            // If it is partition, move the partition directory instead of each file.
             // If custom dynamic location provided, need to rename to final output path
+            final Path parentDir = finalOutputPath.getParent();
             Path dstPath = !customDynamicLocationUsed ? parentDir : finalOutputPath;
-            if (!fs.rename(file, dstPath)) {
-              final String msg = "Failed to move file: " + file + " to " + dstPath;
-              LOG.error(msg);
-              throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
-            }
-            fs.delete(placeholder, false);
+            moves.add(Pair.of(srcF, dstPath));
           } else {
-
-            // In case of no partition we have to move each file
-            for (FileStatus child : children) {
-              moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun, immutable);
-            }
-
+            Collections.addAll(srcQ, srcFs.listStatus(srcF, HIDDEN_FILES_PATH_FILTER));
           }
-
         } else {
-          if(immutable && fs.exists(finalOutputPath) &&
-              !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(fs, finalOutputPath)) {
+          moves.add(Pair.of(srcF, finalOutputPath));
+        }
+      }
+    }
 
-            throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, "Data already exists in " + finalOutputPath
-                + ", duplicate publish not possible.");
-          }
+    if (moves.isEmpty()) {
+      return;
+    }
 
+    final List<Future<Pair<Path, Path>>> futures = new LinkedList<>();
+    final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+        Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null;
+
+    for (final Pair<Path, Path> pair: moves){
+      Path srcP = pair.getLeft();
+      Path dstP = pair.getRight();
+      final String msg = "Unable to move source " + srcP + " to destination " + dstP;
+      if (null==pool) {
+        moveFile(srcFs, srcP, destFs, dstP, conf, canRename);
+      } else {
+        futures.add(pool.submit(new Callable<Pair<Path, Path>>() {
+          @Override
+          public Pair<Path, Path> call() throws IOException {
+            if (moveFile(srcFs, srcP, destFs, dstP, conf, canRename)) {
+              return pair;
+            } else {
+              throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
+            }
+          }
+        }));
+      }
+    }
+    if (null != pool) {
+      pool.shutdown();
+      for (Future<Pair<Path, Path>> future : futures) {
+        try {
+          Pair<Path, Path> pair = future.get();
+          LOG.debug("Moved src: {}, to dest: {}", pair.getLeft().toString(), pair.getRight().toString());
+        } catch (Exception e) {
+          LOG.error("Failed to move {}", e.getMessage());
+          pool.shutdownNow();
+          throw new HCatException(ErrorType.ERROR_MOVE_FAILED, e.getMessage());
         }
       }
     }
   }
 
+  private boolean moveFile(FileSystem srcFs, Path srcf, FileSystem destFs, Path destf, Configuration conf, boolean canRename) throws IOException {
+    boolean moved;
+    if (canRename) {
+      destFs.mkdirs(destf.getParent());
+      moved = srcFs.rename(srcf, destf);
+    } else {
+      moved = FileUtil.copy(srcFs, srcf, destFs, destf, true, false, conf);
+    }
+    return moved;
+  }
+
   /**
    * Find the final name of a given output file, given the output directory
    * and the work directory. If immutable, attempt to create file of name
@@ -750,7 +770,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
         // Move data from temp directory the actual table directory
         // No metastore operation required.
         Path src = new Path(jobInfo.getLocation());
-        moveTaskOutputs(fs, src, src, tblPath, false, table.isImmutable());
+        moveTaskOutputs(conf, src, src, tblPath, table.isImmutable());
         if (!src.equals(tblPath)) {
           fs.delete(src, true);
         }
@@ -813,8 +833,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
           // check here for each dir we're copying out, to see if it
           // already exists, error out if so.
           // Also, treat dyn-writes as writes to immutable tables.
-          moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun = true, immutable = true
-          moveTaskOutputs(fs, src, src, tblPath, false, true);
+          moveTaskOutputs(conf, src, src, tblPath, table.isImmutable());
           if (!src.equals(tblPath)){
             fs.delete(src, true);
           }
@@ -854,8 +873,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
             Partition p = partitionsToAdd.get(0);
             Path src = new Path(jobInfo.getLocation());
             Path dest = new Path(p.getSd().getLocation());
-            moveTaskOutputs(fs, src, src, dest, true, table.isImmutable());
-            moveTaskOutputs(fs,src,src,dest,false,table.isImmutable());
+            moveTaskOutputs(conf, src, src, dest, table.isImmutable());
             if (!src.equals(dest)){
               if (src.toString().matches(".*" + Path.SEPARATOR + SCRATCH_DIR_NAME + "\\d\\.?\\d+.*")){
                 // src is scratch directory, need to trim the part key value pairs from path
@@ -903,8 +921,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
             // Dynamic partitioning usecase
             if (!customDynamicLocationUsed) {
               Path src = new Path(ptnRootLocation);
-              moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun = true, immutable = true
-              moveTaskOutputs(fs, src, src, tblPath, false, true);
+              moveTaskOutputs(conf, src, src, tblPath, true);
               if (!src.equals(tblPath)){
                 fs.delete(src, true);
               }
@@ -956,8 +973,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
     for (Entry<String, Map<String, String>> entry : partitionsDiscoveredByPath.entrySet()) {
       Path src = new Path(entry.getKey());
       Path destPath = new Path(getFinalDynamicPartitionDestination(table, entry.getValue(), jobInfo));
-      moveTaskOutputs(fs, src, src, destPath, true, true); // dryRun = true, immutable = true
-      moveTaskOutputs(fs, src, src, destPath, false, true);
+      moveTaskOutputs(conf, src, src, destPath, true);
     }
     // delete the parent temp directory of all custom dynamic partitions
     Path parentPath = new Path(getCustomPartitionRootLocation(jobInfo, conf));