You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/03/13 09:43:08 UTC

[hive] branch master updated: HIVE-22964: MM table split computation is very slow (Aditya Shah reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e39a2c  HIVE-22964: MM table split computation is very slow (Aditya Shah reviewed by Peter Vary)
7e39a2c is described below

commit 7e39a2c13711f9377c9ce1edb4224880421b1ea5
Author: Aditya Shah <ad...@qubole.com>
AuthorDate: Fri Mar 13 10:30:37 2020 +0100

    HIVE-22964: MM table split computation is very slow (Aditya Shah reviewed by Peter Vary)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  7 ++++
 .../apache/hadoop/hive/ql/io/HiveInputFormat.java  | 49 ++++++++++++++++++++--
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java      |  2 +-
 3 files changed, 53 insertions(+), 5 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a18a6d7..8a9fe3c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2094,6 +2094,10 @@ public class HiveConf extends Configuration {
         "Allow synthetic file ID in splits on file systems that don't have a native one."),
     HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE("hive.orc.cache.stripe.details.mem.size", "256Mb",
         new SizeValidator(), "Maximum size of orc splits cached in the client."),
+    /**
+     * @deprecated Use HiveConf.HIVE_COMPUTE_SPLITS_NUM_THREADS
+     */
+    @Deprecated
     HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10,
         "How many threads orc should use to create splits in parallel."),
     HIVE_ORC_CACHE_USE_SOFT_REFERENCES("hive.orc.cache.use.soft.references", false,
@@ -4827,6 +4831,9 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.SECONDS),
         "Timeout for Running Query in seconds. A nonpositive value means infinite. " +
         "If the query timeout is also set by thrift API call, the smaller one will be taken."),
+    HIVE_COMPUTE_SPLITS_NUM_THREADS("hive.compute.splits.num.threads", 10,
+            "How many threads Input Format should use to create splits in parallel.",
+            HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS.varname),
     HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new  SizeValidator(0L, true, 1024L, true),
         "Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."),
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 233bd1e..96a48de 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -72,6 +73,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -80,6 +82,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static java.lang.Integer.min;
 
 /**
  * HiveInputFormat is a parameterized InputFormat which looks at the path name
@@ -478,7 +486,8 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       pushFilters(conf, tableScan, this.mrwork);
     }
 
-    List<Path> dirsWithFileOriginals = new ArrayList<>(), finalDirs = new ArrayList<>();
+    List<Path> dirsWithFileOriginals = Collections.synchronizedList(new ArrayList<>()),
+            finalDirs = Collections.synchronizedList(new ArrayList<>());
     processPathsForMmRead(dirs, conf, validMmWriteIdList, finalDirs, dirsWithFileOriginals);
     if (finalDirs.isEmpty() && dirsWithFileOriginals.isEmpty()) {
       // This is for transactional tables.
@@ -577,9 +586,41 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       return;
     }
     boolean allowOriginals = HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS);
-    for (Path dir : dirs) {
-      processForWriteIdsForMmRead(
-          dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals);
+
+    int numThreads = min(HiveConf.getIntVar(conf, ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS), dirs.size());
+    List<Future<Void>> pathFutures = new ArrayList<>();
+    ExecutorService pool = null;
+    if (numThreads > 1) {
+      pool = Executors.newFixedThreadPool(numThreads,
+              new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MM-Split-Paths-%d").build());
+    }
+
+    try {
+      for (Path dir : dirs) {
+        if (pool != null) {
+          Future<Void> pathFuture = pool.submit(() -> {
+            processForWriteIdsForMmRead(dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals);
+            return null;
+          });
+          pathFutures.add(pathFuture);
+        } else {
+          processForWriteIdsForMmRead(dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals);
+        }
+      }
+      try {
+        for (Future<Void> pathFuture : pathFutures) {
+          pathFuture.get();
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        for (Future<Void> future : pathFutures) {
+          future.cancel(true);
+        }
+        throw new IOException(e);
+      }
+    } finally {
+      if (pool != null) {
+        pool.shutdown();
+      }
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 7c8f479..1059cb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -672,7 +672,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       splitStrategyBatchMs = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS);
       long cacheMemSize = HiveConf.getSizeVar(
           conf, ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE);
-      int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
+      int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS);
       boolean useSoftReference = HiveConf.getBoolVar(
           conf, ConfVars.HIVE_ORC_CACHE_USE_SOFT_REFERENCES);