You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/02/08 21:36:47 UTC

hive git commit: HIVE-15803 : msck can hang when nested partitions are present (Rajesh Balamohan via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 441b29e10 -> 19a6831b9


HIVE-15803 : msck can hang when nested partitions are present (Rajesh Balamohan via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/19a6831b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/19a6831b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/19a6831b

Branch: refs/heads/master
Commit: 19a6831b9c5fe3911872bcab1b974f3baa4e1db9
Parents: 441b29e
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Feb 7 19:16:00 2017 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed Feb 8 13:35:52 2017 -0800

----------------------------------------------------------------------
 .../hive/ql/metadata/HiveMetaStoreChecker.java  | 45 +++++++++++++++-----
 .../clientpositive/msck_repair_batchsize.q      | 10 +++++
 .../clientpositive/msck_repair_batchsize.q.out  | 25 +++++++++++
 3 files changed, 70 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/19a6831b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 57f731f..7c94c95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -400,24 +399,31 @@ public class HiveMetaStoreChecker {
    *          Specify how deep the search goes.
    * @throws IOException
    *           Thrown if we can't get lists from the fs.
-   * @throws HiveException 
+   * @throws HiveException
    */
 
   private void checkPartitionDirs(Path basePath, Set<Path> allDirs, int maxDepth) throws IOException, HiveException {
     ConcurrentLinkedQueue<Path> basePaths = new ConcurrentLinkedQueue<>();
     basePaths.add(basePath);
-    Set<Path> dirSet = Collections.newSetFromMap(new ConcurrentHashMap<Path, Boolean>());    
+    Set<Path> dirSet = Collections.newSetFromMap(new ConcurrentHashMap<Path, Boolean>());
     // Here we just reuse the THREAD_COUNT configuration for
     // HIVE_MOVE_FILES_THREAD_COUNT
-    final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors
-        .newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
-            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build())
-            : null;
+    int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15);
+
+    // Check if too low config is provided for move files. 2x CPU is reasonable max count.
+    poolSize = poolSize == 0 ? poolSize : Math.max(poolSize,
+        Runtime.getRuntime().availableProcessors() * 2);
+
+    // Fixed thread pool on need basis
+    final ThreadPoolExecutor pool = poolSize > 0 ? (ThreadPoolExecutor)
+        Executors.newFixedThreadPool(poolSize,
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build()) : null;
+
     if (pool == null) {
       LOG.debug("Not-using threaded version of MSCK-GetPaths");
     } else {
       LOG.debug("Using threaded version of MSCK-GetPaths with number of threads "
-          + ((ThreadPoolExecutor) pool).getPoolSize());
+          + pool.getMaximumPoolSize());
     }
     checkPartitionDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf), maxDepth, maxDepth);
     if (pool != null) {
@@ -427,11 +433,30 @@ public class HiveMetaStoreChecker {
   }
 
   // process the basePaths in parallel and then the next level of basePaths
-  private void checkPartitionDirs(final ExecutorService pool,
+  private void checkPartitionDirs(final ThreadPoolExecutor pool,
       final ConcurrentLinkedQueue<Path> basePaths, final Set<Path> allDirs,
       final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException {
     final ConcurrentLinkedQueue<Path> nextLevel = new ConcurrentLinkedQueue<>();
-    if (null == pool) {
+
+    // Check if thread pool can be used.
+    boolean useThreadPool = false;
+    if (pool != null) {
+      synchronized (pool) {
+        // In case of recursive calls, it is possible to deadlock with TP. Check TP usage here.
+        if (pool.getActiveCount() < pool.getMaximumPoolSize()) {
+          useThreadPool = true;
+        }
+
+        if (!useThreadPool) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Not using threadPool as active count:" + pool.getActiveCount()
+                + ", max:" + pool.getMaximumPoolSize());
+          }
+        }
+      }
+    }
+
+    if (null == pool || !useThreadPool) {
       for (final Path path : basePaths) {
         FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
         boolean fileFound = false;

http://git-wip-us.apache.org/repos/asf/hive/blob/19a6831b/ql/src/test/queries/clientpositive/msck_repair_batchsize.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/msck_repair_batchsize.q b/ql/src/test/queries/clientpositive/msck_repair_batchsize.q
index 06e4507..e56e97a 100644
--- a/ql/src/test/queries/clientpositive/msck_repair_batchsize.q
+++ b/ql/src/test/queries/clientpositive/msck_repair_batchsize.q
@@ -20,3 +20,13 @@ MSCK REPAIR TABLE default.repairtable;
 MSCK TABLE repairtable;
 
 DROP TABLE default.repairtable;
+
+
+dfs  ${system:test.dfs.mkdir} -p ${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/p1=c/p2=a/p3=b;
+CREATE TABLE `repairtable`( `col` string) PARTITIONED BY (  `p1` string,  `p2` string) location '${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/';
+
+dfs -touchz ${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/p1=c/p2=a/p3=b/datafile;
+set hive.mv.files.thread=1;
+MSCK TABLE repairtable;
+
+DROP TABLE default.repairtable;

http://git-wip-us.apache.org/repos/asf/hive/blob/19a6831b/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out b/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out
index a0180b7..ba99024 100644
--- a/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out
+++ b/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out
@@ -47,3 +47,28 @@ POSTHOOK: query: DROP TABLE default.repairtable
 POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: default@repairtable
 POSTHOOK: Output: default@repairtable
+#### A masked pattern was here ####
+PREHOOK: type: CREATETABLE
+#### A masked pattern was here ####
+PREHOOK: Output: database:default
+PREHOOK: Output: default@repairtable
+#### A masked pattern was here ####
+POSTHOOK: type: CREATETABLE
+#### A masked pattern was here ####
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@repairtable
+PREHOOK: query: MSCK TABLE repairtable
+PREHOOK: type: MSCK
+PREHOOK: Output: default@repairtable
+POSTHOOK: query: MSCK TABLE repairtable
+POSTHOOK: type: MSCK
+POSTHOOK: Output: default@repairtable
+Partitions not in metastore:	repairtable:p1=c/p2=a
+PREHOOK: query: DROP TABLE default.repairtable
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@repairtable
+PREHOOK: Output: default@repairtable
+POSTHOOK: query: DROP TABLE default.repairtable
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@repairtable
+POSTHOOK: Output: default@repairtable