You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2017/02/28 17:54:28 UTC
hive git commit: HIVE-15879: Fix
HiveMetaStoreChecker.checkPartitionDirs method (Vihang Karajgaonkar,
reviewed by Rajesh Balamohan, Sergio Pena)
Repository: hive
Updated Branches:
refs/heads/master 84fdc1c7c -> 95da916eb
HIVE-15879: Fix HiveMetaStoreChecker.checkPartitionDirs method (Vihang Karajgaonkar, reviewed by Rajesh Balamohan, Sergio Pena)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/95da916e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/95da916e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/95da916e
Branch: refs/heads/master
Commit: 95da916eb249cc0351bf4b94ec6c00b77db03cf3
Parents: 84fdc1c
Author: Vihang Karajgaonkar <vi...@cloudera.com>
Authored: Tue Feb 28 11:53:34 2017 -0600
Committer: Sergio Pena <se...@cloudera.com>
Committed: Tue Feb 28 11:53:34 2017 -0600
----------------------------------------------------------------------
.../hive/ql/metadata/HiveMetaStoreChecker.java | 265 +++++++++++--------
.../ql/metadata/TestHiveMetaStoreChecker.java | 260 +++++++++++++++++-
2 files changed, 408 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/95da916e/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 7c94c95..5d2ae2c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -19,19 +19,20 @@ package org.apache.hadoop.hive.ql.metadata;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.slf4j.Logger;
@@ -403,16 +404,11 @@ public class HiveMetaStoreChecker {
*/
private void checkPartitionDirs(Path basePath, Set<Path> allDirs, int maxDepth) throws IOException, HiveException {
- ConcurrentLinkedQueue<Path> basePaths = new ConcurrentLinkedQueue<>();
- basePaths.add(basePath);
- Set<Path> dirSet = Collections.newSetFromMap(new ConcurrentHashMap<Path, Boolean>());
- // Here we just reuse the THREAD_COUNT configuration for
- // HIVE_MOVE_FILES_THREAD_COUNT
int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15);
// Check if too low config is provided for move files. 2x CPU is reasonable max count.
poolSize = poolSize == 0 ? poolSize : Math.max(poolSize,
- Runtime.getRuntime().availableProcessors() * 2);
+ getMinPoolSize());
// Fixed thread pool on need basis
final ThreadPoolExecutor pool = poolSize > 0 ? (ThreadPoolExecutor)
@@ -421,135 +417,176 @@ public class HiveMetaStoreChecker {
if (pool == null) {
LOG.debug("Not-using threaded version of MSCK-GetPaths");
+ Queue<Path> basePaths = new LinkedList<>();
+ basePaths.add(basePath);
+ checkPartitionDirsSingleThreaded(basePaths, allDirs, basePath.getFileSystem(conf), maxDepth,
+ maxDepth);
} else {
- LOG.debug("Using threaded version of MSCK-GetPaths with number of threads "
+ LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads "
+ pool.getMaximumPoolSize());
+ checkPartitionDirsInParallel((ThreadPoolExecutor) pool, basePath, allDirs,
+ basePath.getFileSystem(conf), maxDepth);
}
- checkPartitionDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf), maxDepth, maxDepth);
if (pool != null) {
pool.shutdown();
}
- allDirs.addAll(dirSet);
}
- // process the basePaths in parallel and then the next level of basePaths
- private void checkPartitionDirs(final ThreadPoolExecutor pool,
- final ConcurrentLinkedQueue<Path> basePaths, final Set<Path> allDirs,
- final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException {
- final ConcurrentLinkedQueue<Path> nextLevel = new ConcurrentLinkedQueue<>();
+ @VisibleForTesting
+ int getMinPoolSize() {
+ return Runtime.getRuntime().availableProcessors() * 2;
+ }
- // Check if thread pool can be used.
- boolean useThreadPool = false;
- if (pool != null) {
- synchronized (pool) {
- // In case of recursive calls, it is possible to deadlock with TP. Check TP usage here.
- if (pool.getActiveCount() < pool.getMaximumPoolSize()) {
- useThreadPool = true;
- }
+ private final class PathDepthInfoCallable implements Callable<Path> {
+ private final int maxDepth;
+ private final FileSystem fs;
+ private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths;
+ private final boolean throwException;
+ private final PathDepthInfo pd;
+
+ private PathDepthInfoCallable(PathDepthInfo pd, int maxDepth, FileSystem fs,
+ ConcurrentLinkedQueue<PathDepthInfo> basePaths) {
+ this.maxDepth = maxDepth;
+ this.pd = pd;
+ this.fs = fs;
+ this.pendingPaths = basePaths;
+ this.throwException = "throw"
+ .equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION));
+ }
- if (!useThreadPool) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not using threadPool as active count:" + pool.getActiveCount()
- + ", max:" + pool.getMaximumPoolSize());
- }
- }
- }
+ @Override
+ public Path call() throws Exception {
+ return processPathDepthInfo(pd);
}
- if (null == pool || !useThreadPool) {
- for (final Path path : basePaths) {
- FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
- boolean fileFound = false;
- for (FileStatus status : statuses) {
- if (status.isDirectory()) {
- nextLevel.add(status.getPath());
- } else {
- fileFound = true;
- }
+ private Path processPathDepthInfo(final PathDepthInfo pd)
+ throws IOException, HiveException, InterruptedException {
+ final Path currentPath = pd.p;
+ final int currentDepth = pd.depth;
+ FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
+ // found no files under a sub-directory under table base path; it is possible that the table
+ // is empty and hence there are no partition sub-directories created under base path
+ if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < maxDepth) {
+ // since maxDepth is not yet reached, we are missing partition
+ // columns in currentPath
+ if (throwException) {
+ throw new HiveException(
+ "MSCK is missing partition columns under " + currentPath.toString());
+ } else {
+ LOG.warn("MSCK is missing partition columns under " + currentPath.toString());
}
- if (depth != 0) {
- // we are in the middle of the search and we find a file
- if (fileFound) {
- if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) {
+ } else {
+ // found files under currentPath add them to the queue if it is a directory
+ for (FileStatus fileStatus : fileStatuses) {
+ if (!fileStatus.isDirectory() && currentDepth < maxDepth) {
+ // found a file at depth which is less than number of partition keys
+ if (throwException) {
throw new HiveException(
- "MSCK finds a file rather than a folder when it searches for " + path.toString());
- } else {
- LOG.warn("MSCK finds a file rather than a folder when it searches for "
- + path.toString());
- }
- }
- if (!nextLevel.isEmpty()) {
- checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth);
- } else if (depth != maxDepth) {
- // since nextLevel is empty, we are missing partition columns.
- if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) {
- throw new HiveException("MSCK is missing partition columns under " + path.toString());
+ "MSCK finds a file rather than a directory when it searches for "
+ + fileStatus.getPath().toString());
} else {
- LOG.warn("MSCK is missing partition columns under " + path.toString());
+ LOG.warn("MSCK finds a file rather than a directory when it searches for "
+ + fileStatus.getPath().toString());
}
+ } else if (fileStatus.isDirectory() && currentDepth < maxDepth) {
+ // add sub-directory to the work queue if maxDepth is not yet reached
+ pendingPaths.add(new PathDepthInfo(fileStatus.getPath(), currentDepth + 1));
}
- } else {
- allDirs.add(path);
+ }
+ if (currentDepth == maxDepth) {
+ return currentPath;
}
}
- } else {
- final List<Future<Void>> futures = new LinkedList<>();
- for (final Path path : basePaths) {
- futures.add(pool.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
- boolean fileFound = false;
- for (FileStatus status : statuses) {
- if (status.isDirectory()) {
- nextLevel.add(status.getPath());
- } else {
- fileFound = true;
- }
- }
- if (depth != 0) {
- // we are in the middle of the search and we find a file
- if (fileFound) {
- if ("throw".equals(HiveConf.getVar(conf,
- HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) {
- throw new HiveException(
- "MSCK finds a file rather than a folder when it searches for "
- + path.toString());
- } else {
- LOG.warn("MSCK finds a file rather than a folder when it searches for "
- + path.toString());
- }
- }
- if (!nextLevel.isEmpty()) {
- checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth);
- } else if (depth != maxDepth) {
- // since nextLevel is empty, we are missing partition columns.
- if ("throw".equals(HiveConf.getVar(conf,
- HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) {
- throw new HiveException("MSCK is missing partition columns under "
- + path.toString());
- } else {
- LOG.warn("MSCK is missing partition columns under " + path.toString());
- }
- }
- } else {
- allDirs.add(path);
- }
- return null;
+ return null;
+ }
+ }
+
+ private static class PathDepthInfo {
+ private final Path p;
+ private final int depth;
+ PathDepthInfo(Path p, int depth) {
+ this.p = p;
+ this.depth = depth;
+ }
+ }
+
+ private void checkPartitionDirsInParallel(final ThreadPoolExecutor pool,
+ final Path basePath, final Set<Path> result,
+ final FileSystem fs, final int maxDepth) throws HiveException {
+ try {
+ Queue<Future<Path>> futures = new LinkedList<Future<Path>>();
+ ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>();
+ nextLevel.add(new PathDepthInfo(basePath, 0));
+ //Uses level parallel implementation of a bfs. Recursive DFS implementations
+ //have a issue where the number of threads can run out if the number of
+ //nested sub-directories is more than the pool size.
+ //Using a two queue implementation is simpler than one queue since then we will
+ //have to add the complex mechanisms to let the free worker threads know when new levels are
+ //discovered using notify()/wait() mechanisms which can potentially lead to bugs if
+ //not done right
+ while(!nextLevel.isEmpty()) {
+ ConcurrentLinkedQueue<PathDepthInfo> tempQueue = new ConcurrentLinkedQueue<>();
+ //process each level in parallel
+ while(!nextLevel.isEmpty()) {
+ futures.add(
+ pool.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue)));
+ }
+ while(!futures.isEmpty()) {
+ Path p = futures.poll().get();
+ if (p != null) {
+ result.add(p);
}
- }));
+ }
+ //update the nextlevel with newly discovered sub-directories from the above
+ nextLevel = tempQueue;
}
- for (Future<Void> future : futures) {
- try {
- future.get();
- } catch (Exception e) {
- LOG.error(e.getMessage());
- pool.shutdownNow();
- throw new HiveException(e.getCause());
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error(e.getMessage());
+ pool.shutdownNow();
+ throw new HiveException(e.getCause());
+ }
+ }
+
+ /*
+ * Original recursive implementation works well for single threaded use-case but has limitations
+ * if we attempt to parallelize this directly
+ */
+ private void checkPartitionDirsSingleThreaded(Queue<Path> basePaths, final Set<Path> allDirs,
+ final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException {
+ final Queue<Path> nextLevel = new LinkedList<>();
+ for (final Path path : basePaths) {
+ FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
+ boolean fileFound = false;
+ for (FileStatus status : statuses) {
+ if (status.isDirectory()) {
+ nextLevel.add(status.getPath());
+ } else {
+ fileFound = true;
}
}
- if (!nextLevel.isEmpty() && depth != 0) {
- checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth);
+ if (depth != 0) {
+ // we are in the middle of the search and we find a file
+ if (fileFound) {
+ if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) {
+ throw new HiveException(
+ "MSCK finds a file rather than a folder when it searches for " + path.toString());
+ } else {
+ LOG.warn("MSCK finds a file rather than a folder when it searches for "
+ + path.toString());
+ }
+ }
+ if (!nextLevel.isEmpty()) {
+ checkPartitionDirsSingleThreaded(nextLevel, allDirs, fs, depth - 1, maxDepth);
+ } else if (depth != maxDepth) {
+ // since nextLevel is empty, we are missing partition columns.
+ if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) {
+ throw new HiveException("MSCK is missing partition columns under " + path.toString());
+ } else {
+ LOG.warn("MSCK is missing partition columns under " + path.toString());
+ }
+ }
+ } else {
+ allDirs.add(path);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/95da916e/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
index 35f52cd..f95c130 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.metadata;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,6 +30,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -37,8 +39,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.util.Shell;
import org.apache.thrift.TException;
+import org.mockito.Mockito;
/**
* TestHiveMetaStoreChecker.
@@ -63,6 +65,8 @@ public class TestHiveMetaStoreChecker extends TestCase {
protected void setUp() throws Exception {
super.setUp();
hive = Hive.get();
+ hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 15);
+ hive.getConf().set(HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION.varname, "throw");
checker = new HiveMetaStoreChecker(hive);
partCols = new ArrayList<FieldSchema>();
@@ -104,7 +108,6 @@ public class TestHiveMetaStoreChecker extends TestCase {
public void testTableCheck() throws HiveException, MetaException,
IOException, TException, AlreadyExistsException {
-
CheckResult result = new CheckResult();
checker.checkMetastore(dbName, null, null, result);
// we haven't added anything so should return an all ok
@@ -194,7 +197,6 @@ public class TestHiveMetaStoreChecker extends TestCase {
public void testPartitionsCheck() throws HiveException, MetaException,
IOException, TException, AlreadyExistsException {
-
Database db = new Database();
db.setName(dbName);
hive.createDatabase(db);
@@ -311,4 +313,256 @@ public class TestHiveMetaStoreChecker extends TestCase {
hive.dropDatabase(dbName);
assertFalse(fs.exists(fakeTable));
}
+
+ /*
+ * Test multi-threaded implementation of checker to find out missing partitions
+ */
+ public void testPartitionsNotInMs() throws HiveException, AlreadyExistsException, IOException {
+ Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
+ // add 10 partitions on the filesystem
+ createPartitionsDirectoriesOnFS(testTable, 10);
+ CheckResult result = new CheckResult();
+ checker.checkMetastore(dbName, tableName, null, result);
+ assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
+ assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
+ assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
+ assertEquals(10, result.getPartitionsNotInMs().size());
+ }
+
+ /*
+ * Tests single threaded implementation of checkMetastore
+ */
+ public void testSingleThreadedCheckMetastore()
+ throws HiveException, AlreadyExistsException, IOException {
+ // set num of threads to 0 so that single-threaded checkMetastore is called
+ hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 0);
+ Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
+ // add 10 partitions on the filesystem
+ createPartitionsDirectoriesOnFS(testTable, 10);
+ CheckResult result = new CheckResult();
+ checker.checkMetastore(dbName, tableName, null, result);
+ assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
+ assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
+ assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+ assertEquals(10, result.getPartitionsNotInMs().size());
+ }
+
+ /**
+ * Tests single threaded implementation for deeply nested partitioned tables
+ *
+ * @throws HiveException
+ * @throws AlreadyExistsException
+ * @throws IOException
+ */
+ public void testSingleThreadedDeeplyNestedTables()
+ throws HiveException, AlreadyExistsException, IOException {
+ // set num of threads to 0 so that single-threaded checkMetastore is called
+ hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 0);
+ // currently HiveMetastoreChecker uses a minimum pool size of 2*numOfProcs
+ // no other easy way to set it deterministically for this test case
+ checker = Mockito.spy(checker);
+ Mockito.when(checker.getMinPoolSize()).thenReturn(2);
+ int poolSize = checker.getMinPoolSize();
+ // create a deeply nested table which has more partition keys than the pool size
+ Table testTable = createPartitionedTestTable(dbName, tableName, poolSize + 2, 0);
+ // add 10 partitions on the filesystem
+ createPartitionsDirectoriesOnFS(testTable, 10);
+ CheckResult result = new CheckResult();
+ checker.checkMetastore(dbName, tableName, null, result);
+ assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
+ assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
+ assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+ assertEquals(10, result.getPartitionsNotInMs().size());
+ }
+
+ /**
+ * Tests the case when the number of partition keys are more than the threadpool size.
+ *
+ * @throws HiveException
+ * @throws AlreadyExistsException
+ * @throws IOException
+ */
+ public void testDeeplyNestedPartitionedTables()
+ throws HiveException, AlreadyExistsException, IOException {
+ // currently HiveMetastoreChecker uses a minimum pool size of 2*numOfProcs
+ // no other easy way to set it deterministically for this test case
+ int poolSize = checker.getMinPoolSize();
+ checker = Mockito.spy(checker);
+ Mockito.when(checker.getMinPoolSize()).thenReturn(2);
+ // create a deeply nested table which has more partition keys than the pool size
+ Table testTable = createPartitionedTestTable(dbName, tableName, poolSize + 2, 0);
+ // add 10 partitions on the filesystem
+ createPartitionsDirectoriesOnFS(testTable, 10);
+ CheckResult result = new CheckResult();
+ checker.checkMetastore(dbName, tableName, null, result);
+ assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
+ assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
+ assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+ assertEquals(10, result.getPartitionsNotInMs().size());
+ }
+
+ /**
+ * Test if checker throws HiveException when the there is a dummy directory present in the nested level
+ * of sub-directories
+ * @throws AlreadyExistsException
+ * @throws IOException
+ * @throws HiveException
+ */
+ public void testErrorForMissingPartitionColumn() throws AlreadyExistsException, IOException, HiveException {
+ Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
+ // add 10 partitions on the filesystem
+ createPartitionsDirectoriesOnFS(testTable, 10);
+ //create a fake directory to throw exception
+ StringBuilder sb = new StringBuilder(testTable.getDataLocation().toString());
+ sb.append(Path.SEPARATOR);
+ sb.append("dummyPart=error");
+ createDirectory(sb.toString());
+ //check result now
+ CheckResult result = new CheckResult();
+ try {
+ checker.checkMetastore(dbName, tableName, null, result);
+ } catch (Exception e) {
+ assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException);
+ }
+ createFile(sb.toString(), "dummyFile");
+ result = new CheckResult();
+ try {
+ checker.checkMetastore(dbName, tableName, null, result);
+ } catch (Exception e) {
+ assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException);
+ }
+ }
+
+ /*
+ * Test if single-threaded implementation checker throws HiveException when the there is a dummy
+ * directory present in the nested level
+ */
+ public void testErrorForMissingPartitionsSingleThreaded()
+ throws AlreadyExistsException, HiveException, IOException {
+ // set num of threads to 0 so that single-threaded checkMetastore is called
+ hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 0);
+ Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
+ // add 10 partitions on the filesystem
+ createPartitionsDirectoriesOnFS(testTable, 10);
+ // create a fake directory to throw exception
+ StringBuilder sb = new StringBuilder(testTable.getDataLocation().toString());
+ sb.append(Path.SEPARATOR);
+ sb.append("dummyPart=error");
+ createDirectory(sb.toString());
+ // check result now
+ CheckResult result = new CheckResult();
+ try {
+ checker.checkMetastore(dbName, tableName, null, result);
+ } catch (Exception e) {
+ assertTrue("Expected exception HiveException got " + e.getClass(),
+ e instanceof HiveException);
+ }
+ createFile(sb.toString(), "dummyFile");
+ result = new CheckResult();
+ try {
+ checker.checkMetastore(dbName, tableName, null, result);
+ } catch (Exception e) {
+ assertTrue("Expected exception HiveException got " + e.getClass(),
+ e instanceof HiveException);
+ }
+ }
+ /**
+ * Creates a test partitioned table with the required level of nested partitions and number of
+ * partitions
+ *
+ * @param dbName - Database name
+ * @param tableName - Table name
+ * @param numOfPartKeys - Number of partition keys (nested levels of sub-directories in base table
+ * path)
+ * @param valuesPerPartition - If greater than 0 creates valuesPerPartition dummy partitions
+ * @return
+ * @throws AlreadyExistsException
+ * @throws HiveException
+ */
+ private Table createPartitionedTestTable(String dbName, String tableName, int numOfPartKeys,
+ int valuesPerPartition) throws AlreadyExistsException, HiveException {
+ Database db = new Database();
+ db.setName(dbName);
+ hive.createDatabase(db);
+
+ Table table = new Table(dbName, tableName);
+ table.setDbName(dbName);
+ table.setInputFormatClass(TextInputFormat.class);
+ table.setOutputFormatClass(HiveIgnoreKeyTextOutputFormat.class);
+ // create partition key schema
+ ArrayList<FieldSchema> partKeys = new ArrayList<FieldSchema>();
+ for (int i = 1; i <= numOfPartKeys; i++) {
+ String partName = "part" + String.valueOf(i);
+ partKeys.add(new FieldSchema(partName, serdeConstants.STRING_TYPE_NAME, ""));
+ }
+ table.setPartCols(partKeys);
+ // create table
+ hive.createTable(table);
+ table = hive.getTable(dbName, tableName);
+ if (valuesPerPartition == 0) {
+ return table;
+ }
+ // create partition specs
+ ArrayList<Map<String, String>> partitionSpecs = new ArrayList<Map<String, String>>();
+ for (int partKeyIndex = 0; partKeyIndex < numOfPartKeys; partKeyIndex++) {
+ String partName = partKeys.get(partKeyIndex).getName();
+ Map<String, String> partMap = new HashMap<>();
+ for (int val = 1; val <= valuesPerPartition; val++) {
+ partMap.put(partName, String.valueOf(val));
+ }
+ partitionSpecs.add(partMap);
+ }
+
+ // create partitions
+ for (Map<String, String> partSpec : partitionSpecs) {
+ hive.createPartition(table, partSpec);
+ }
+
+ List<Partition> partitions = hive.getPartitions(table);
+ assertEquals(numOfPartKeys * valuesPerPartition, partitions.size());
+ return table;
+ }
+
+ /**
+ * Creates partition sub-directories for a given table on the file system. Used to test the
+ * use-cases when partitions for the table are not present in the metastore db
+ *
+ * @param table - Table which provides the base locations and partition specs for creating the
+ * sub-directories
+ * @param numPartitions - Number of partitions to be created
+ * @throws IOException
+ */
+ private void createPartitionsDirectoriesOnFS(Table table, int numPartitions) throws IOException {
+ String path = table.getDataLocation().toString();
+ fs = table.getPath().getFileSystem(hive.getConf());
+ int numPartKeys = table.getPartitionKeys().size();
+ for (int i = 0; i < numPartitions; i++) {
+ StringBuilder partPath = new StringBuilder(path);
+ partPath.append(Path.SEPARATOR);
+ for (int j = 0; j < numPartKeys; j++) {
+ FieldSchema field = table.getPartitionKeys().get(j);
+ partPath.append(field.getName());
+ partPath.append('=');
+ partPath.append("val_");
+ partPath.append(i);
+ if (j < (numPartKeys - 1)) {
+ partPath.append(Path.SEPARATOR);
+ }
+ }
+ createDirectory(partPath.toString());
+ }
+ }
+
+ private void createFile(String partPath, String filename) throws IOException {
+ Path part = new Path(partPath);
+ fs.mkdirs(part);
+ fs.createNewFile(new Path(partPath + Path.SEPARATOR + filename));
+ fs.deleteOnExit(part);
+ }
+
+ private void createDirectory(String partPath) throws IOException {
+ Path part = new Path(partPath);
+ fs.mkdirs(part);
+ fs.deleteOnExit(part);
+ }
}