You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/03/13 09:43:08 UTC
[hive] branch master updated: HIVE-22964: MM table split
computation is very slow (Aditya Shah reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7e39a2c HIVE-22964: MM table split computation is very slow (Aditya Shah reviewed by Peter Vary)
7e39a2c is described below
commit 7e39a2c13711f9377c9ce1edb4224880421b1ea5
Author: Aditya Shah <ad...@qubole.com>
AuthorDate: Fri Mar 13 10:30:37 2020 +0100
HIVE-22964: MM table split computation is very slow (Aditya Shah reviewed by Peter Vary)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 7 ++++
.../apache/hadoop/hive/ql/io/HiveInputFormat.java | 49 ++++++++++++++++++++--
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 2 +-
3 files changed, 53 insertions(+), 5 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a18a6d7..8a9fe3c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2094,6 +2094,10 @@ public class HiveConf extends Configuration {
"Allow synthetic file ID in splits on file systems that don't have a native one."),
HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE("hive.orc.cache.stripe.details.mem.size", "256Mb",
new SizeValidator(), "Maximum size of orc splits cached in the client."),
+ /**
+ * @deprecated Use HiveConf.HIVE_COMPUTE_SPLITS_NUM_THREADS
+ */
+ @Deprecated
HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10,
"How many threads orc should use to create splits in parallel."),
HIVE_ORC_CACHE_USE_SOFT_REFERENCES("hive.orc.cache.use.soft.references", false,
@@ -4827,6 +4831,9 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.SECONDS),
"Timeout for Running Query in seconds. A nonpositive value means infinite. " +
"If the query timeout is also set by thrift API call, the smaller one will be taken."),
+ HIVE_COMPUTE_SPLITS_NUM_THREADS("hive.compute.splits.num.threads", 10,
+ "How many threads Input Format should use to create splits in parallel.",
+ HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS.varname),
HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new SizeValidator(0L, true, 1024L, true),
"Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 233bd1e..96a48de 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -72,6 +73,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
@@ -80,6 +82,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static java.lang.Integer.min;
/**
* HiveInputFormat is a parameterized InputFormat which looks at the path name
@@ -478,7 +486,8 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
pushFilters(conf, tableScan, this.mrwork);
}
- List<Path> dirsWithFileOriginals = new ArrayList<>(), finalDirs = new ArrayList<>();
+ List<Path> dirsWithFileOriginals = Collections.synchronizedList(new ArrayList<>()),
+ finalDirs = Collections.synchronizedList(new ArrayList<>());
processPathsForMmRead(dirs, conf, validMmWriteIdList, finalDirs, dirsWithFileOriginals);
if (finalDirs.isEmpty() && dirsWithFileOriginals.isEmpty()) {
// This is for transactional tables.
@@ -577,9 +586,41 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
return;
}
boolean allowOriginals = HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS);
- for (Path dir : dirs) {
- processForWriteIdsForMmRead(
- dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals);
+
+ int numThreads = min(HiveConf.getIntVar(conf, ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS), dirs.size());
+ List<Future<Void>> pathFutures = new ArrayList<>();
+ ExecutorService pool = null;
+ if (numThreads > 1) {
+ pool = Executors.newFixedThreadPool(numThreads,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MM-Split-Paths-%d").build());
+ }
+
+ try {
+ for (Path dir : dirs) {
+ if (pool != null) {
+ Future<Void> pathFuture = pool.submit(() -> {
+ processForWriteIdsForMmRead(dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals);
+ return null;
+ });
+ pathFutures.add(pathFuture);
+ } else {
+ processForWriteIdsForMmRead(dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals);
+ }
+ }
+ try {
+ for (Future<Void> pathFuture : pathFutures) {
+ pathFuture.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ for (Future<Void> future : pathFutures) {
+ future.cancel(true);
+ }
+ throw new IOException(e);
+ }
+ } finally {
+ if (pool != null) {
+ pool.shutdown();
+ }
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 7c8f479..1059cb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -672,7 +672,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
splitStrategyBatchMs = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS);
long cacheMemSize = HiveConf.getSizeVar(
conf, ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE);
- int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
+ int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS);
boolean useSoftReference = HiveConf.getBoolVar(
conf, ConfVars.HIVE_ORC_CACHE_USE_SOFT_REFERENCES);