You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2023/04/11 00:19:50 UTC
[hive] branch master updated: HIVE-27143: Optimize HCatStorer moveTask (#4177)
This is an automated email from the ASF dual-hosted git repository.
daijy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 43491dbd75b HIVE-27143: Optimize HCatStorer moveTask (#4177)
43491dbd75b is described below
commit 43491dbd75b83daa755438eb6f43cf6e6b47b1c1
Author: yigress <10...@users.noreply.github.com>
AuthorDate: Mon Apr 10 17:19:38 2023 -0700
HIVE-27143: Optimize HCatStorer moveTask (#4177)
* HIVE-27143: Optimize HCatStorer moveTask
* fix custom dynamic partition
---
.../mapreduce/FileOutputCommitterContainer.java | 230 +++++++++++----------
1 file changed, 123 insertions(+), 107 deletions(-)
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
index ef3c1afc457..476c60e53af 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
@@ -19,23 +19,34 @@
package org.apache.hive.hcatalog.mapreduce;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.io.HdfsUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -225,6 +236,15 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
}
}
+ public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path p) {
+ String name = p.getName();
+ boolean filtered = name.equals(TEMP_DIR_NAME) || name.equals(LOGS_DIR_NAME) || name.equals(SUCCEEDED_FILE_NAME);
+ return !filtered;
+ }
+ };
+
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
@@ -367,10 +387,11 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
partPath = new Path(finalLocn);
} else {
partPath = new Path(partLocnRoot);
+ FileSystem partFs = partPath.getFileSystem(context.getConfiguration());
int i = 0;
for (FieldSchema partKey : table.getPartitionKeys()) {
if (i++ != 0) {
- fs.mkdirs(partPath); // Attempt to make the path in case it does not exist before we check
+ partFs.mkdirs(partPath); // Attempt to make the path in case it does not exist before we check
HdfsUtils.setFullFileStatus(conf, status, status.getFileStatus().getGroup(), fs,
partPath, false);
}
@@ -380,7 +401,8 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
// Do not need to set the status on the partition directory. We will do it later recursively.
// See: end of the registerPartitions method
- fs.mkdirs(partPath);
+ FileSystem partFs = partPath.getFileSystem(context.getConfiguration());
+ partFs.mkdirs(partPath);
// Set the location in the StorageDescriptor
if (dynamicPartitioningUsed) {
@@ -467,131 +489,129 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
/**
* Move all of the files from the temp directory to the final location
- * @param fs the output file system
- * @param file the file to move
+ * @param srcf the file to move
* @param srcDir the source directory
* @param destDir the target directory
- * @param dryRun - a flag that simply tests if this move would succeed or not based
- * on whether other files exist where we're trying to copy
+ * @param immutable - whether table is immutable.
* @throws java.io.IOException
*/
- private void moveTaskOutputs(FileSystem fs, Path file, Path srcDir,
- Path destDir, final boolean dryRun, boolean immutable
- ) throws IOException {
+ private void moveTaskOutputs(final Configuration conf, Path srcf, Path srcDir,
+ Path destDir, boolean immutable) throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("moveTaskOutputs "
- + file + " from: " + srcDir + " to: " + destDir
- + " dry: " + dryRun + " immutable: " + immutable);
+ + srcf + " from: " + srcDir + " to: " + destDir + " immutable: " + immutable);
}
if (dynamicPartitioningUsed) {
immutable = true; // Making sure we treat dynamic partitioning jobs as if they were immutable.
}
- if (file.getName().equals(TEMP_DIR_NAME) || file.getName().equals(LOGS_DIR_NAME) || file.getName().equals(SUCCEEDED_FILE_NAME)) {
- return;
- }
+ final FileSystem srcFs = srcf.getFileSystem(conf);
+ final FileSystem destFs = destDir.getFileSystem(conf);
+ final boolean canRename = srcFs.getUri().equals(destFs.getUri());
- final Path finalOutputPath = getFinalPath(fs, file, srcDir, destDir, immutable);
- FileStatus fileStatus = fs.getFileStatus(file);
+ if (destFs.exists(destDir) && !destFs.getFileStatus(destDir).isDirectory()) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Destination is not directory " + destDir);
+ }
- if (!fileStatus.isDir()) {
- if (dryRun){
- if (immutable){
- // Dryrun checks are meaningless for mutable table - we should always succeed
- // unless there is a runtime IOException.
- LOG.debug("Testing if moving file: [{}] to [{}] would cause a problem", file, finalOutputPath);
- if (fs.exists(finalOutputPath)) {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in "
- + finalOutputPath + ", duplicate publish not possible.");
- }
- }
- } else {
- LOG.debug("Moving file: [{}] to [{}]", file, finalOutputPath);
- // Make sure the parent directory exists. It is not an error
- // to recreate an existing directory
- fs.mkdirs(finalOutputPath.getParent());
- if (!fs.rename(file, finalOutputPath)) {
- if (!fs.delete(finalOutputPath, true)) {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
- }
- if (!fs.rename(file, finalOutputPath)) {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + finalOutputPath);
- }
- }
+ LinkedList<Pair<Path, Path>> moves = new LinkedList<>();
+ if (customDynamicLocationUsed) {
+ if (immutable && destFs.exists(destDir) &&
+ !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, destDir)) {
+ throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
+ "Data already exists in " + destDir
+ + ", duplicate publish not possible.");
}
+ moves.add(Pair.of(srcf, destDir));
} else {
-
- FileStatus[] children = fs.listStatus(file);
- FileStatus firstChild = null;
- if (children != null) {
- int index=0;
- while (index < children.length) {
- if ( !children[index].getPath().getName().equals(TEMP_DIR_NAME)
- && !children[index].getPath().getName().equals(LOGS_DIR_NAME)
- && !children[index].getPath().getName().equals(SUCCEEDED_FILE_NAME)) {
- firstChild = children[index];
- break;
- }
- index++;
- }
+ Queue<FileStatus> srcQ = new LinkedList<>();
+ FileStatus[] contents = srcFs.listStatus(srcf, HIDDEN_FILES_PATH_FILTER);
+ if (contents.length == 0) {
+ // nothing to move
+ return;
}
- if(firstChild!=null && firstChild.isDir()) {
- // If the first child is directory, then rest would be directory too according to HCatalog dir structure
- // recurse in that case
- for (FileStatus child : children) {
- moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun, immutable);
+ Collections.addAll(srcQ, contents);
+
+ while (!srcQ.isEmpty()) {
+ FileStatus srcStatus = srcQ.remove();
+ Path srcF = srcStatus.getPath();
+ final Path finalOutputPath = getFinalPath(destFs, srcF, srcDir, destDir, immutable);
+ if (immutable && destFs.exists(finalOutputPath) &&
+ !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, finalOutputPath)) {
+ throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
+ "Data already exists in " + finalOutputPath
+ + ", duplicate publish not possible.");
}
- } else {
-
- if (!dryRun) {
- if (dynamicPartitioningUsed) {
-
- // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself
- // instead of moving each file under the directory. See HCATALOG-538
- // Note for future Append implementation : This optimization is another reason dynamic
- // partitioning is currently incompatible with append on mutable tables.
-
- final Path parentDir = finalOutputPath.getParent();
- // Create the directory
- Path placeholder = new Path(parentDir, "_placeholder" + String.valueOf(Math.random()));
- if (fs.mkdirs(parentDir)) {
- // It is weird but we need a placeholder,
- // otherwise rename cannot move file to the right place
- fs.create(placeholder).close();
- }
- LOG.debug("Moving directory: {} to {}", file, parentDir);
-
+ if (srcStatus.isDirectory()) {
+ if (canRename && dynamicPartitioningUsed) {
+ // If it is partition, move the partition directory instead of each file.
// If custom dynamic location provided, need to rename to final output path
+ final Path parentDir = finalOutputPath.getParent();
Path dstPath = !customDynamicLocationUsed ? parentDir : finalOutputPath;
- if (!fs.rename(file, dstPath)) {
- final String msg = "Failed to move file: " + file + " to " + dstPath;
- LOG.error(msg);
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
- }
- fs.delete(placeholder, false);
+ moves.add(Pair.of(srcF, dstPath));
} else {
-
- // In case of no partition we have to move each file
- for (FileStatus child : children) {
- moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun, immutable);
- }
-
+ Collections.addAll(srcQ, srcFs.listStatus(srcF, HIDDEN_FILES_PATH_FILTER));
}
-
} else {
- if(immutable && fs.exists(finalOutputPath) &&
- !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(fs, finalOutputPath)) {
+ moves.add(Pair.of(srcF, finalOutputPath));
+ }
+ }
+ }
- throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, "Data already exists in " + finalOutputPath
- + ", duplicate publish not possible.");
- }
+ if (moves.isEmpty()) {
+ return;
+ }
+ final List<Future<Pair<Path, Path>>> futures = new LinkedList<>();
+ final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+ Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null;
+
+ for (final Pair<Path, Path> pair: moves){
+ Path srcP = pair.getLeft();
+ Path dstP = pair.getRight();
+ final String msg = "Unable to move source " + srcP + " to destination " + dstP;
+ if (null==pool) {
+ moveFile(srcFs, srcP, destFs, dstP, conf, canRename);
+ } else {
+ futures.add(pool.submit(new Callable<Pair<Path, Path>>() {
+ @Override
+ public Pair<Path, Path> call() throws IOException {
+ if (moveFile(srcFs, srcP, destFs, dstP, conf, canRename)) {
+ return pair;
+ } else {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
+ }
+ }
+ }));
+ }
+ }
+ if (null != pool) {
+ pool.shutdown();
+ for (Future<Pair<Path, Path>> future : futures) {
+ try {
+ Pair<Path, Path> pair = future.get();
+ LOG.debug("Moved src: {}, to dest: {}", pair.getLeft().toString(), pair.getRight().toString());
+ } catch (Exception e) {
+ LOG.error("Failed to move {}", e.getMessage());
+ pool.shutdownNow();
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, e.getMessage());
}
}
}
}
+ private boolean moveFile(FileSystem srcFs, Path srcf, FileSystem destFs, Path destf, Configuration conf, boolean canRename) throws IOException {
+ boolean moved;
+ if (canRename) {
+ destFs.mkdirs(destf.getParent());
+ moved = srcFs.rename(srcf, destf);
+ } else {
+ moved = FileUtil.copy(srcFs, srcf, destFs, destf, true, false, conf);
+ }
+ return moved;
+ }
+
/**
* Find the final name of a given output file, given the output directory
* and the work directory. If immutable, attempt to create file of name
@@ -750,7 +770,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
// Move data from temp directory the actual table directory
// No metastore operation required.
Path src = new Path(jobInfo.getLocation());
- moveTaskOutputs(fs, src, src, tblPath, false, table.isImmutable());
+ moveTaskOutputs(conf, src, src, tblPath, table.isImmutable());
if (!src.equals(tblPath)) {
fs.delete(src, true);
}
@@ -813,8 +833,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
// check here for each dir we're copying out, to see if it
// already exists, error out if so.
// Also, treat dyn-writes as writes to immutable tables.
- moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun = true, immutable = true
- moveTaskOutputs(fs, src, src, tblPath, false, true);
+ moveTaskOutputs(conf, src, src, tblPath, table.isImmutable());
if (!src.equals(tblPath)){
fs.delete(src, true);
}
@@ -854,8 +873,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
Partition p = partitionsToAdd.get(0);
Path src = new Path(jobInfo.getLocation());
Path dest = new Path(p.getSd().getLocation());
- moveTaskOutputs(fs, src, src, dest, true, table.isImmutable());
- moveTaskOutputs(fs,src,src,dest,false,table.isImmutable());
+ moveTaskOutputs(conf, src, src, dest, table.isImmutable());
if (!src.equals(dest)){
if (src.toString().matches(".*" + Path.SEPARATOR + SCRATCH_DIR_NAME + "\\d\\.?\\d+.*")){
// src is scratch directory, need to trim the part key value pairs from path
@@ -903,8 +921,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
// Dynamic partitioning usecase
if (!customDynamicLocationUsed) {
Path src = new Path(ptnRootLocation);
- moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun = true, immutable = true
- moveTaskOutputs(fs, src, src, tblPath, false, true);
+ moveTaskOutputs(conf, src, src, tblPath, true);
if (!src.equals(tblPath)){
fs.delete(src, true);
}
@@ -956,8 +973,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer {
for (Entry<String, Map<String, String>> entry : partitionsDiscoveredByPath.entrySet()) {
Path src = new Path(entry.getKey());
Path destPath = new Path(getFinalDynamicPartitionDestination(table, entry.getValue(), jobInfo));
- moveTaskOutputs(fs, src, src, destPath, true, true); // dryRun = true, immutable = true
- moveTaskOutputs(fs, src, src, destPath, false, true);
+ moveTaskOutputs(conf, src, src, destPath, true);
}
// delete the parent temp directory of all custom dynamic partitions
Path parentPath = new Path(getCustomPartitionRootLocation(jobInfo, conf));