You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2022/02/25 06:40:36 UTC

[GitHub] [hive] cravani opened a new pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

cravani opened a new pull request #3053:
URL: https://github.com/apache/hive/pull/3053


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://cwiki.apache.org/confluence/display/Hive/HowToContribute
     2. Ensure that you have created an issue on the Hive project JIRA: https://issues.apache.org/jira/projects/HIVE/summary
     3. Ensure you have added or run the appropriate tests for your PR: 
     4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]HIVE-XXXXX:  Your PR title ...'.
     5. Be sure to keep the PR description updated to reflect all changes.
     6. Please write your PR title to summarize what this PR proposes.
     7. If possible, provide a concise example to reproduce the issue for a faster review.
   
   -->
   
   ### What changes were proposed in this pull request?
   Make [HiveMetaStoreChecker.checkTable](https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java#L299) operation run with multi threaded plus allow user to configure number of threads.
   
   
   ### Why are the changes needed?
   Currently [HiveMetaStoreChecker.checkTable](https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java#L299) is implemented with serialized operation to check current partitions in a table as part of MSCK operation, this can be run in parallel to improve the overall performance for MSCK Repair table queries.
   If the table has 1000s of partitions on Cloud Storage (S3), this operation could take longer to run. Changing it to run multithreaded will improve performance.
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, PR introduces new configuration property with no changes for user by default.
   Property: hive.metastore.msck.fshandler.threads/metastore.msck.fshandler.threads which defaults to 1 and that is current behavior, however based on user requirement it can be configured at the service/session level.
   
   
   ### How was this patch tested?
   For stability of the patch added new clientpositive test case -> msck_repair_multi_thread.q
   additionally the patch was tested manually in terms of performance.
   i.e with 33k partitions without patch MSCK operation took 2000 seconds vs with Patch 200 seconds.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816484985



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {

Review comment:
       Or it is run in the original thead

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {

Review comment:
       Check if the PartitionIterator is thread safe




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816818516



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -311,48 +315,96 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    ArrayList<String> processedPartitions = new ArrayList<String>();;
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path partPath = getDataLocation(table, partition);
+        if (partPath == null) {
+          continue;
+        }
+        futures.add(pool.submit(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            Path tempPartPath = partPath;
+            FileSystem tempFs = tempPartPath.getFileSystem(conf);
+            CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
+            String partName = getPartitionName(table, partition);
+            prFromMetastore.setPartitionName(partName);
+            prFromMetastore.setTableName(partition.getTableName());
+
+            synchronized (this) {
+              if (!tempFs.exists(tempPartPath)) {
+                result.getPartitionsNotOnFs().add(prFromMetastore);
+              } else {
+                result.getCorrectPartitions().add(prFromMetastore);
+              }
+            }
 
-      for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
-        Path qualifiedPath = partPath.makeQualified(fs);
-        partPaths.add(qualifiedPath);
-        partPath = partPath.getParent();
+            if (partitionExpirySeconds > 0) {
+              long currentEpochSecs = Instant.now().getEpochSecond();
+              long createdTime = partition.getCreateTime();
+              long partitionAgeSeconds = currentEpochSecs - createdTime;
+              if (partitionAgeSeconds > partitionExpirySeconds) {
+                CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+                pr.setPartitionName(getPartitionName(table, partition));
+                pr.setTableName(partition.getTableName());
+                synchronized (result) {
+                  result.getExpiredPartitions().add(pr);
+                }
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
+                      partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
+                      partitionAgeSeconds, partitionExpirySeconds);
+                }
+              }
+            }
+
+            synchronized (this) {
+              for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
+                final Path qualifiedPath = tempPartPath.makeQualified(tempFs);
+                partPaths.add(qualifiedPath);
+                tempPartPath = tempPartPath.getParent();
+              }
+            }
+            return processedPartitions.add(partName);
+          }
+        }));
+      }
+      while (!futures.isEmpty()) {
+        futures.poll().get();
+      }
+    } catch (ExecutionException | InterruptedException e) {
+      LOG.error("Exception occurred while processing partitions "+e);

Review comment:
       LOG.error("Exception occurred while processing partitions", e);




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816535345



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};
+        if (partPath[0] == null) {
+          continue;
+        }
+        futures.add(pool.submit(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            fs[0] = partPath[0].getFileSystem(conf);
+            CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
+            prFromMetastore.setPartitionName(getPartitionName(table, partition));
+            prFromMetastore.setTableName(partition.getTableName());
+            if (!fs[0].exists(partPath[0])) {
+              synchronized (result) {
+                result.getPartitionsNotOnFs().add(prFromMetastore);
+              }
+            } else {
+              synchronized (result) {
+                result.getCorrectPartitions().add(prFromMetastore);
+              }
+            }
+
+            if (partitionExpirySeconds > 0) {
+              long currentEpochSecs = Instant.now().getEpochSecond();
+              long createdTime = partition.getCreateTime();
+              long partitionAgeSeconds = currentEpochSecs - createdTime;
+              if (partitionAgeSeconds > partitionExpirySeconds) {
+                CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+                pr.setPartitionName(getPartitionName(table, partition));
+                pr.setTableName(partition.getTableName());
+                synchronized (result) {
+                  result.getExpiredPartitions().add(pr);
+                }
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
+                      partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
+                      partitionAgeSeconds, partitionExpirySeconds);
+                }
+              }
+            }
 
-      for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
-        Path qualifiedPath = partPath.makeQualified(fs);
-        partPaths.add(qualifiedPath);
-        partPath = partPath.getParent();
+            for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
+              final Path qualifiedPath = partPath[0].makeQualified(fs[0]);
+              synchronized (partPaths) {
+                partPaths.add(qualifiedPath);
+              }
+              partPath[0] = partPath[0].getParent();
+            }
+            return "CheckTable task complete.";
+          }
+        }));
+      }
+      while (!futures.isEmpty()) {
+        String message = futures.poll().get();
+        LOG.debug(message);
+      }
+    } catch (ExecutionException e) {
+      e.printStackTrace();

Review comment:
       Never do something like this.
   Please use log or rethrow an exception.
   
   Nit: If you catch 2 types of exception and do the same thing with them you might just use:
   ```
   } catch (ExecutionException | InterruptedException e) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] shameersss1 commented on a change in pull request #3053: HIVE-25980: Reduce fs calls in HiveMetaStoreChecker.checkTable

Posted by GitBox <gi...@apache.org>.
shameersss1 commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r828821293



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -468,6 +459,25 @@ void findUnknownPartitions(Table table, Set<Path> partPaths, byte[] filterExp,
         }
       }
     }
+    
+    Set<Path> partPathsInMS = partPaths;
+    // don't want the table dir
+    partPathsInMS.remove(tablePath);
+    // remove partition paths in partPathsInMS, to getPartitionsNotOnFs
+    partPathsInMS.removeAll(allPartDirs);

Review comment:
       allPartDirs here contains all partition path which are there in FileSystem but not in metastore. Removing it from partPathsInMS doesn't make sense.
   
   I think this operation should be done before allPartDirs.removeAll(partPaths);

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -468,6 +459,25 @@ void findUnknownPartitions(Table table, Set<Path> partPaths, byte[] filterExp,
         }
       }
     }
+    
+    Set<Path> partPathsInMS = partPaths;
+    // don't want the table dir
+    partPathsInMS.remove(tablePath);
+    // remove partition paths in partPathsInMS, to getPartitionsNotOnFs
+    partPathsInMS.removeAll(allPartDirs);
+    // There can be edge case where user can define partition directory outside of table directory
+    // to avoid eviction of such partitions
+    // we check for partition path not exists and add to result for getPartitionsNotOnFs.
+    for (Path partPath : partPathsInMS) {
+      FileSystem fs = partPath.getFileSystem(conf);

Review comment:
       As you pointed out, Having multi-threaded check of partition files will be useful.

##########
File path: ql/src/test/queries/clientpositive/msck_repair_multi_thread.q
##########
@@ -0,0 +1,33 @@
+DROP TABLE IF EXISTS repairtable_hive_25980;

Review comment:
       file name doesn't match the optimization we are doing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816539245



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};
+        if (partPath[0] == null) {
+          continue;
+        }
+        futures.add(pool.submit(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            fs[0] = partPath[0].getFileSystem(conf);
+            CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
+            prFromMetastore.setPartitionName(getPartitionName(table, partition));
+            prFromMetastore.setTableName(partition.getTableName());
+            if (!fs[0].exists(partPath[0])) {
+              synchronized (result) {
+                result.getPartitionsNotOnFs().add(prFromMetastore);
+              }
+            } else {
+              synchronized (result) {
+                result.getCorrectPartitions().add(prFromMetastore);
+              }
+            }
+
+            if (partitionExpirySeconds > 0) {
+              long currentEpochSecs = Instant.now().getEpochSecond();
+              long createdTime = partition.getCreateTime();
+              long partitionAgeSeconds = currentEpochSecs - createdTime;
+              if (partitionAgeSeconds > partitionExpirySeconds) {
+                CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+                pr.setPartitionName(getPartitionName(table, partition));
+                pr.setTableName(partition.getTableName());
+                synchronized (result) {
+                  result.getExpiredPartitions().add(pr);
+                }
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
+                      partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
+                      partitionAgeSeconds, partitionExpirySeconds);
+                }
+              }
+            }
 
-      for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
-        Path qualifiedPath = partPath.makeQualified(fs);
-        partPaths.add(qualifiedPath);
-        partPath = partPath.getParent();
+            for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
+              final Path qualifiedPath = partPath[0].makeQualified(fs[0]);
+              synchronized (partPaths) {

Review comment:
       Wouldn't be better to get the `synchronized` lock once and do the loop inside?
   JVM might do the same, but this should be a very fast loop and getting/releasing a lock in a loop can be counterproductive




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] shameersss1 commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
shameersss1 commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816597275



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};
+        if (partPath[0] == null) {
+          continue;
+        }
+        futures.add(pool.submit(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            fs[0] = partPath[0].getFileSystem(conf);
+            CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
+            prFromMetastore.setPartitionName(getPartitionName(table, partition));
+            prFromMetastore.setTableName(partition.getTableName());
+            if (!fs[0].exists(partPath[0])) {

Review comment:
       We are anyhow scanning all the partition from the filesystem in https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java#L398
   
   We could remove the fs.exist() check here and store all the partition path in memory or pass the iterator to findUnknownPartitions method and take the diff between two sets which will give us the partition which is in metastore but not in filesystem.
   
   Note: I think this would be much faster than muti-threaded approach!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816483649



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {

Review comment:
       Why is this change? 

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};

Review comment:
       Nit: formatting




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816485713



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+

Review comment:
       Nit: spaces around the `+`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816537279



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};

Review comment:
       Also this might be problematic with concurrent execution - exactly because this is not final




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816817711



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -311,48 +315,96 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);

Review comment:
       Please use ` LOG.debug("Running with threads "+threadCount);`

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -311,48 +315,96 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);

Review comment:
       Please use `LOG.debug("Running with threads {}", threadCount);`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816531781



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};

Review comment:
       Is this just for tricking the `final` to accept changes?
   Wouldn't be better for readability to use different variables instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] shameersss1 commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
shameersss1 commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816597275



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};
+        if (partPath[0] == null) {
+          continue;
+        }
+        futures.add(pool.submit(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            fs[0] = partPath[0].getFileSystem(conf);
+            CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
+            prFromMetastore.setPartitionName(getPartitionName(table, partition));
+            prFromMetastore.setTableName(partition.getTableName());
+            if (!fs[0].exists(partPath[0])) {

Review comment:
       We are anyhow scanning all the partition from the filesystem in https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java#L398
   
   We could remove the fs.exist() check here and store all the partition path in memory or pass the iterator to findUnknownPartitions method and take the diff between two sets which will give us the partition which is in metastore but not in filesystem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] shameersss1 commented on a change in pull request #3053: HIVE-25980: Reduce fs calls in HiveMetaStoreChecker.checkTable

Posted by GitBox <gi...@apache.org>.
shameersss1 commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r828821803



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -468,6 +459,25 @@ void findUnknownPartitions(Table table, Set<Path> partPaths, byte[] filterExp,
         }
       }
     }
+    
+    Set<Path> partPathsInMS = partPaths;
+    // don't want the table dir
+    partPathsInMS.remove(tablePath);
+    // remove partition paths in partPathsInMS, to getPartitionsNotOnFs
+    partPathsInMS.removeAll(allPartDirs);
+    // There can be edge case where user can define partition directory outside of table directory
+    // to avoid eviction of such partitions
+    // we check for partition path not exists and add to result for getPartitionsNotOnFs.
+    for (Path partPath : partPathsInMS) {
+      FileSystem fs = partPath.getFileSystem(conf);

Review comment:
       As you pointed out, Having multi-threaded check of partition files  here will be useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816484677



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {

Review comment:
       Check if the PartitionIterator is thread safe




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] shameersss1 commented on pull request #3053: HIVE-25980: Reduce fs calls in HiveMetaStoreChecker.checkTable

Posted by GitBox <gi...@apache.org>.
shameersss1 commented on pull request #3053:
URL: https://github.com/apache/hive/pull/3053#issuecomment-1070436169


   Try to squash merge all the commits into one single commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] shameersss1 commented on a change in pull request #3053: HIVE-25980: Reduce fs calls in HiveMetaStoreChecker.checkTable

Posted by GitBox <gi...@apache.org>.
shameersss1 commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r830751017



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -310,6 +310,7 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     }
 
     Set<Path> partPaths = new HashSet<>();
+    Set<Path> partialPartPaths = new HashSet<>();

Review comment:
       Can we optimize this without using any extra space?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816483649



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {

Review comment:
       Why is this change? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816540070



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};
+        if (partPath[0] == null) {
+          continue;
+        }
+        futures.add(pool.submit(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            fs[0] = partPath[0].getFileSystem(conf);
+            CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
+            prFromMetastore.setPartitionName(getPartitionName(table, partition));
+            prFromMetastore.setTableName(partition.getTableName());
+            if (!fs[0].exists(partPath[0])) {
+              synchronized (result) {
+                result.getPartitionsNotOnFs().add(prFromMetastore);
+              }
+            } else {
+              synchronized (result) {
+                result.getCorrectPartitions().add(prFromMetastore);
+              }
+            }
+
+            if (partitionExpirySeconds > 0) {
+              long currentEpochSecs = Instant.now().getEpochSecond();
+              long createdTime = partition.getCreateTime();
+              long partitionAgeSeconds = currentEpochSecs - createdTime;
+              if (partitionAgeSeconds > partitionExpirySeconds) {
+                CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+                pr.setPartitionName(getPartitionName(table, partition));
+                pr.setTableName(partition.getTableName());
+                synchronized (result) {
+                  result.getExpiredPartitions().add(pr);
+                }
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
+                      partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
+                      partitionAgeSeconds, partitionExpirySeconds);
+                }
+              }
+            }
 
-      for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
-        Path qualifiedPath = partPath.makeQualified(fs);
-        partPaths.add(qualifiedPath);
-        partPath = partPath.getParent();
+            for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
+              final Path qualifiedPath = partPath[0].makeQualified(fs[0]);
+              synchronized (partPaths) {
+                partPaths.add(qualifiedPath);
+              }
+              partPath[0] = partPath[0].getParent();
+            }
+            return "CheckTable task complete.";
+          }
+        }));
+      }
+      while (!futures.isEmpty()) {
+        String message = futures.poll().get();
+        LOG.debug(message);
+      }
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      LOG.info("Partition exists check complete.");

Review comment:
       Nit: Maybe print the result, so we have better info when reading the logs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816536599



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};
+        if (partPath[0] == null) {
+          continue;
+        }
+        futures.add(pool.submit(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            fs[0] = partPath[0].getFileSystem(conf);

Review comment:
       Why are we reusing the `fs[0]` object?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816484342



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};

Review comment:
       Nit: formatting 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816485993



##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -949,6 +949,8 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
         "Used to avoid all of the proxies and object copies in the metastore.  Note, if this is " +
             "set, you MUST use a local metastore (hive.metastore.uris must be empty) otherwise " +
             "undefined and most likely undesired behavior will result"),
+    METASTORE_MSCK_FS_HANDLER_THREADS_COUNT("hive.metastore.msck.fshandler.threads", 1, new  SizeValidator(1L, true, 30L, true),
+            "Number of threads to be allocated for metastore handler for msck fs operations."),

Review comment:
       Nit: add min, max to the description as well 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816534259



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};
+        if (partPath[0] == null) {
+          continue;
+        }
+        futures.add(pool.submit(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            fs[0] = partPath[0].getFileSystem(conf);
+            CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
+            prFromMetastore.setPartitionName(getPartitionName(table, partition));
+            prFromMetastore.setTableName(partition.getTableName());
+            if (!fs[0].exists(partPath[0])) {
+              synchronized (result) {
+                result.getPartitionsNotOnFs().add(prFromMetastore);
+              }
+            } else {
+              synchronized (result) {
+                result.getCorrectPartitions().add(prFromMetastore);
+              }
+            }
+
+            if (partitionExpirySeconds > 0) {
+              long currentEpochSecs = Instant.now().getEpochSecond();
+              long createdTime = partition.getCreateTime();
+              long partitionAgeSeconds = currentEpochSecs - createdTime;
+              if (partitionAgeSeconds > partitionExpirySeconds) {
+                CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+                pr.setPartitionName(getPartitionName(table, partition));
+                pr.setTableName(partition.getTableName());
+                synchronized (result) {
+                  result.getExpiredPartitions().add(pr);
+                }
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
+                      partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
+                      partitionAgeSeconds, partitionExpirySeconds);
+                }
+              }
+            }
 
-      for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
-        Path qualifiedPath = partPath.makeQualified(fs);
-        partPaths.add(qualifiedPath);
-        partPath = partPath.getParent();
+            for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
+              final Path qualifiedPath = partPath[0].makeQualified(fs[0]);
+              synchronized (partPaths) {
+                partPaths.add(qualifiedPath);
+              }
+              partPath[0] = partPath[0].getParent();
+            }
+            return "CheckTable task complete.";

Review comment:
       Maybe add something related to the path/partition/table which could be useful in the later debug message. This message is really useless for anything, other than counting the successful checks 😄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816484342



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};

Review comment:
       Nit: formatting 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816819306



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -311,48 +315,96 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    ArrayList<String> processedPartitions = new ArrayList<String>();;
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path partPath = getDataLocation(table, partition);
+        if (partPath == null) {
+          continue;
+        }
+        futures.add(pool.submit(new Callable() {
+          @Override
+          public Object call() throws Exception {
+            Path tempPartPath = partPath;
+            FileSystem tempFs = tempPartPath.getFileSystem(conf);
+            CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
+            String partName = getPartitionName(table, partition);
+            prFromMetastore.setPartitionName(partName);
+            prFromMetastore.setTableName(partition.getTableName());
+
+            synchronized (this) {
+              if (!tempFs.exists(tempPartPath)) {
+                result.getPartitionsNotOnFs().add(prFromMetastore);
+              } else {
+                result.getCorrectPartitions().add(prFromMetastore);
+              }
+            }
 
-      for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
-        Path qualifiedPath = partPath.makeQualified(fs);
-        partPaths.add(qualifiedPath);
-        partPath = partPath.getParent();
+            if (partitionExpirySeconds > 0) {
+              long currentEpochSecs = Instant.now().getEpochSecond();
+              long createdTime = partition.getCreateTime();
+              long partitionAgeSeconds = currentEpochSecs - createdTime;
+              if (partitionAgeSeconds > partitionExpirySeconds) {
+                CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+                pr.setPartitionName(getPartitionName(table, partition));
+                pr.setTableName(partition.getTableName());
+                synchronized (result) {
+                  result.getExpiredPartitions().add(pr);
+                }
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
+                      partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
+                      partitionAgeSeconds, partitionExpirySeconds);
+                }
+              }
+            }
+
+            synchronized (this) {
+              for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
+                final Path qualifiedPath = tempPartPath.makeQualified(tempFs);
+                partPaths.add(qualifiedPath);
+                tempPartPath = tempPartPath.getParent();
+              }
+            }
+            return processedPartitions.add(partName);
+          }
+        }));
+      }
+      while (!futures.isEmpty()) {
+        futures.poll().get();
+      }
+    } catch (ExecutionException | InterruptedException e) {
+      LOG.error("Exception occurred while processing partitions "+e);
+    }
+    finally {
+      LOG.debug("Partition exists check complete for table "+table.getCatName()+"."+table.getDbName()+"."+table.getTableName()+" partitions "+processedPartitions);

Review comment:
       `LOG.debug("Partition exists check complete for table {}.{}.{} partitions {}", table.getCatName(), table.getDbName(), table.getTableName(), processedPartitions);`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r814583976



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +304,132 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
+
+    final ExecutorService pool = (threadCount > 1) ?
+        Executors.newFixedThreadPool(threadCount,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("CheckTable-PartitionOptimizer-%d").build()) : null;
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      if (pool != null) {
+        // check that the partition folders exist on disk using multi-thread
+        for (Partition partition : parts) {

Review comment:
       I think this will fetch all of the partitions from the partition iterator immediately and keep them in memory.
   
   The goal was with the partition iterator to prevent OOM when there are big tables with huge number of partitions. We do not want every partition in the memory once, so the iterator fetched them in batches, and after we did not use them we let the GC take care of the batch.
   
   With this change I expect that we create a `Future` immediately for all of the partitions and we will keep all of the partitions in memory until all of the checks are finished.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816482750



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};

Review comment:
       Nit: formatting




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816484985



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {

Review comment:
       Or it is run in the original thead




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #3053: HIVE-25980: Support HiveMetaStoreChecker.checkTable operation with multi-threaded

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816530943



##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+
+    // For Multi Threaded run, we do not want to wait for All partitions in queue to be processed,
+    // instead we run in batch to avoid OOM, we set Min and Max Pool Size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+    // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+    final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+            threadCount,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(threadCount),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      Queue<Future<String>> futures = new LinkedList<>();
+      // check that the partition folders exist on disk
+      for (Partition partition : parts) {
+        if (partition == null) {
+          // most likely the user specified an invalid partition
+          continue;
         }
-      }
+        Path[] partPath = {getDataLocation(table, partition)};

Review comment:
       Using arrays to trick `final` requirement is really strange for me.
   While it works we might want to more explicit in the code and that can help with the future readability

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckRes
     if (tablePath == null) {
       return;
     }
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
+    final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+    if (!fs[0].exists(tablePath)) {
       result.getTablesNotOnFs().add(table.getTableName());
       return;
     }
 
     Set<Path> partPaths = new HashSet<>();
 
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = getDataLocation(table, partition);
-      if (partPath == null) {
-        continue;
-      }
-      fs = partPath.getFileSystem(conf);
+    int threadCount = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
 
-      CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
-      prFromMetastore.setPartitionName(getPartitionName(table, partition));
-      prFromMetastore.setTableName(partition.getTableName());
-      if (!fs.exists(partPath)) {
-        result.getPartitionsNotOnFs().add(prFromMetastore);
-      } else {
-        result.getCorrectPartitions().add(prFromMetastore);
-      }
+    Preconditions.checkArgument(!(threadCount < 1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+    Preconditions.checkArgument(!(threadCount > 30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
 
-      if (partitionExpirySeconds > 0) {
-        long currentEpochSecs = Instant.now().getEpochSecond();
-        long createdTime = partition.getCreateTime();
-        long partitionAgeSeconds = currentEpochSecs - createdTime;
-        if (partitionAgeSeconds > partitionExpirySeconds) {
-          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
-          pr.setPartitionName(getPartitionName(table, partition));
-          pr.setTableName(partition.getTableName());
-          result.getExpiredPartitions().add(pr);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
-                partitionAgeSeconds, partitionExpirySeconds);
-          }
+    LOG.debug("Running with threads "+threadCount);
+

Review comment:
       Nit: spaces around the `+`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org