You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/02/08 21:36:47 UTC
hive git commit: HIVE-15803 : msck can hang when nested partitions
are present (Rajesh Balamohan via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 441b29e10 -> 19a6831b9
HIVE-15803 : msck can hang when nested partitions are present (Rajesh Balamohan via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/19a6831b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/19a6831b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/19a6831b
Branch: refs/heads/master
Commit: 19a6831b9c5fe3911872bcab1b974f3baa4e1db9
Parents: 441b29e
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Feb 7 19:16:00 2017 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed Feb 8 13:35:52 2017 -0800
----------------------------------------------------------------------
.../hive/ql/metadata/HiveMetaStoreChecker.java | 45 +++++++++++++++-----
.../clientpositive/msck_repair_batchsize.q | 10 +++++
.../clientpositive/msck_repair_batchsize.q.out | 25 +++++++++++
3 files changed, 70 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/19a6831b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 57f731f..7c94c95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -28,7 +28,6 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
@@ -400,24 +399,31 @@ public class HiveMetaStoreChecker {
* Specify how deep the search goes.
* @throws IOException
* Thrown if we can't get lists from the fs.
- * @throws HiveException
+ * @throws HiveException
*/
private void checkPartitionDirs(Path basePath, Set<Path> allDirs, int maxDepth) throws IOException, HiveException {
ConcurrentLinkedQueue<Path> basePaths = new ConcurrentLinkedQueue<>();
basePaths.add(basePath);
- Set<Path> dirSet = Collections.newSetFromMap(new ConcurrentHashMap<Path, Boolean>());
+ Set<Path> dirSet = Collections.newSetFromMap(new ConcurrentHashMap<Path, Boolean>());
// Here we just reuse the THREAD_COUNT configuration for
// HIVE_MOVE_FILES_THREAD_COUNT
- final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors
- .newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build())
- : null;
+ int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15);
+
+ // Check if too low config is provided for move files. 2x CPU is reasonable max count.
+ poolSize = poolSize == 0 ? poolSize : Math.max(poolSize,
+ Runtime.getRuntime().availableProcessors() * 2);
+
+ // Fixed thread pool on need basis
+ final ThreadPoolExecutor pool = poolSize > 0 ? (ThreadPoolExecutor)
+ Executors.newFixedThreadPool(poolSize,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build()) : null;
+
if (pool == null) {
LOG.debug("Not-using threaded version of MSCK-GetPaths");
} else {
LOG.debug("Using threaded version of MSCK-GetPaths with number of threads "
- + ((ThreadPoolExecutor) pool).getPoolSize());
+ + pool.getMaximumPoolSize());
}
checkPartitionDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf), maxDepth, maxDepth);
if (pool != null) {
@@ -427,11 +433,30 @@ public class HiveMetaStoreChecker {
}
// process the basePaths in parallel and then the next level of basePaths
- private void checkPartitionDirs(final ExecutorService pool,
+ private void checkPartitionDirs(final ThreadPoolExecutor pool,
final ConcurrentLinkedQueue<Path> basePaths, final Set<Path> allDirs,
final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException {
final ConcurrentLinkedQueue<Path> nextLevel = new ConcurrentLinkedQueue<>();
- if (null == pool) {
+
+ // Check if thread pool can be used.
+ boolean useThreadPool = false;
+ if (pool != null) {
+ synchronized (pool) {
+ // In case of recursive calls, it is possible to deadlock with TP. Check TP usage here.
+ if (pool.getActiveCount() < pool.getMaximumPoolSize()) {
+ useThreadPool = true;
+ }
+
+ if (!useThreadPool) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not using threadPool as active count:" + pool.getActiveCount()
+ + ", max:" + pool.getMaximumPoolSize());
+ }
+ }
+ }
+ }
+
+ if (null == pool || !useThreadPool) {
for (final Path path : basePaths) {
FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
boolean fileFound = false;
http://git-wip-us.apache.org/repos/asf/hive/blob/19a6831b/ql/src/test/queries/clientpositive/msck_repair_batchsize.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/msck_repair_batchsize.q b/ql/src/test/queries/clientpositive/msck_repair_batchsize.q
index 06e4507..e56e97a 100644
--- a/ql/src/test/queries/clientpositive/msck_repair_batchsize.q
+++ b/ql/src/test/queries/clientpositive/msck_repair_batchsize.q
@@ -20,3 +20,13 @@ MSCK REPAIR TABLE default.repairtable;
MSCK TABLE repairtable;
DROP TABLE default.repairtable;
+
+
+dfs ${system:test.dfs.mkdir} -p ${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/p1=c/p2=a/p3=b;
+CREATE TABLE `repairtable`( `col` string) PARTITIONED BY ( `p1` string, `p2` string) location '${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/';
+
+dfs -touchz ${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/p1=c/p2=a/p3=b/datafile;
+set hive.mv.files.thread=1;
+MSCK TABLE repairtable;
+
+DROP TABLE default.repairtable;
http://git-wip-us.apache.org/repos/asf/hive/blob/19a6831b/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out b/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out
index a0180b7..ba99024 100644
--- a/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out
+++ b/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out
@@ -47,3 +47,28 @@ POSTHOOK: query: DROP TABLE default.repairtable
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@repairtable
POSTHOOK: Output: default@repairtable
+#### A masked pattern was here ####
+PREHOOK: type: CREATETABLE
+#### A masked pattern was here ####
+PREHOOK: Output: database:default
+PREHOOK: Output: default@repairtable
+#### A masked pattern was here ####
+POSTHOOK: type: CREATETABLE
+#### A masked pattern was here ####
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@repairtable
+PREHOOK: query: MSCK TABLE repairtable
+PREHOOK: type: MSCK
+PREHOOK: Output: default@repairtable
+POSTHOOK: query: MSCK TABLE repairtable
+POSTHOOK: type: MSCK
+POSTHOOK: Output: default@repairtable
+Partitions not in metastore: repairtable:p1=c/p2=a
+PREHOOK: query: DROP TABLE default.repairtable
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@repairtable
+PREHOOK: Output: default@repairtable
+POSTHOOK: query: DROP TABLE default.repairtable
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@repairtable
+POSTHOOK: Output: default@repairtable