You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/07/17 17:09:18 UTC

[GitHub] [hive] adesh-rao opened a new pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

adesh-rao opened a new pull request #1275:
URL: https://github.com/apache/hive/pull/1275


   ## NOTICE
   
   Please create an issue in ASF JIRA before opening a pull request,
   and you need to set the title of the pull request which starts with
   the corresponding JIRA issue number. (e.g. HIVE-XXXXX: Fix a typo in YYY)
   For more details, please see https://cwiki.apache.org/confluence/display/Hive/HowToContribute
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458254600



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -113,6 +122,8 @@ public void run() {
         }
       }
     } while (!stop.get());
+
+    ((ExecutorService)cleanerExecutor).shutdown();

Review comment:
       Modify it now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] belugabehr edited a comment on pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
belugabehr edited a comment on pull request #1275:
URL: https://github.com/apache/hive/pull/1275#issuecomment-662082780


   Void type:
   
   ```
     public static void main(String[] args) throws IOException
     {
   
       ExecutorService service = Executors.newFixedThreadPool(2);
       CompletionService<Void> completionService = new ExecutorCompletionService<>(service);
   
       for (int i = 0; i < 10; i++)
       {
         final int t = i;
         completionService.submit(() -> {
           clean("File" + t);
           return null;
         });
       }
   
       for (int i = 0; i < 10; ++i) {
         try {
             completionService.take().get();
         } catch (Exception ignore) {}
     }
     }
   
     public static String clean(String path)
     {
       return "Cleaned: " + path;
     }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r459433077



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,62 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
-  }
-
-  @Override
-  public void run() {
     if (cleanerCheckInterval == 0) {
       cleanerCheckInterval = conf.getTimeVar(
-          HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+              HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
+    cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+            conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE),
+            COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
+  }
 
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
-        }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
-        }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+  @Override
+  public void run() {
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
         try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          List<CompletableFuture> cleanerList = new ArrayList<>();
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                    clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
+          }
+          CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
+        } finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
+        }
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime < cleanerCheckInterval && !stop.get()) {

Review comment:
       @deniskuzZ This condition is modified. The previous comment is there on the outdated code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458967846



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {

Review comment:
       @adesh-rao, sorry I think you have placed it originally at the right spot, at the end of run() method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458096765



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +67,15 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private Executor cleanerExecutor;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE));

Review comment:
       Just saw the approach in Worker.java, it looks much better/cleaner. I will make the changes here only ()for both cleaner/intiator) and update once it is ready.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458629144



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -149,17 +156,25 @@ public void run() {
               String runAs = resolveUserToRunAs(tblNameOwners, t, p);
               /* checkForCompaction includes many file metadata checks and may be expensive.
                * Therefore, using a thread pool here and running checkForCompactions in parallel */
-              compactionList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(() ->
-                  scheduleCompactionIfRequired(ci, t, p, runAs)), compactionExecutor));
+              completionService.submit(() -> {
+                ThrowingRunnable.unchecked(() -> scheduleCompactionIfRequired(ci, t, p, runAs));
+                return null;

Review comment:
       what's the reasoning behind this change? returning some null doesn't add more code readability




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458751459



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {
+          cleanerExecutor.shutdownNow();
+          cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+                  conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE), threadNameFormat);
+        }
+      } finally {
         if (handle != null) {
           handle.releaseLocks();
         }
       }
       // Now, go back to bed until it's time to do this again
       long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+      if (!(elapsedTime >= cleanerCheckInterval || stop.get())) {

Review comment:
       fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r460706465



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,62 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
-  }
-
-  @Override
-  public void run() {
     if (cleanerCheckInterval == 0) {

Review comment:
       i think, this if check is redundant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r462318529



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,60 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerCheckInterval = conf.getTimeVar(
+            HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+    cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+            conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE),
+            COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
   }
 
   @Override
   public void run() {
-    if (cleanerCheckInterval == 0) {
-      cleanerCheckInterval = conf.getTimeVar(
-          HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
-    }
-
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
-        }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
-        }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
         try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          List<CompletableFuture> cleanerList = new ArrayList<>();
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                    clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
+          }
+          CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
+        } finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
         }
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime < cleanerCheckInterval && !stop.get()) {
+          try {
+            Thread.sleep(cleanerCheckInterval - elapsedTime);
+          } catch (InterruptedException ie) {

Review comment:
       do you think it's ok to have un-interruptible thread ?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,60 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerCheckInterval = conf.getTimeVar(
+            HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+    cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+            conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE),
+            COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
   }
 
   @Override
   public void run() {
-    if (cleanerCheckInterval == 0) {
-      cleanerCheckInterval = conf.getTimeVar(
-          HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
-    }
-
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
-        }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
-        }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
         try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          List<CompletableFuture> cleanerList = new ArrayList<>();
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                    clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
+          }
+          CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
+        } finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
         }
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime < cleanerCheckInterval && !stop.get()) {
+          try {
+            Thread.sleep(cleanerCheckInterval - elapsedTime);
+          } catch (InterruptedException ie) {

Review comment:
       do you think it's ok to have un-interruptible thread ? in current situation if someone interrupts the Cleaner thread it will just go right away with next clean attempt




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458674847



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +72,23 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private ExecutorService cleanerExecutor;
+  private CompletionService<Void> completionService;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458624026



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -182,6 +197,10 @@ public void run() {
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor initiator, exiting " +
           StringUtils.stringifyException(t));
+    } finally {
+      if (compactionExecutor != null) {
+        compactionExecutor.shutdownNow();

Review comment:
       what's the reasoning behind this? if you want to call it, do it before releasing the lock!!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r461306773



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,62 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
-  }
-
-  @Override
-  public void run() {
     if (cleanerCheckInterval == 0) {

Review comment:
       Yes, this is redundant. Removed it.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,60 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerCheckInterval = conf.getTimeVar(
+            HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+    cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+            conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE),
+            COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
   }
 
   @Override
   public void run() {
-    if (cleanerCheckInterval == 0) {
-      cleanerCheckInterval = conf.getTimeVar(
-          HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
-    }
-
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
-        }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
-        }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
         try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          List<CompletableFuture> cleanerList = new ArrayList<>();
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                    clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
+          }
+          CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
+        } finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
         }
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime < cleanerCheckInterval && !stop.get()) {
+          try {
+            Thread.sleep(cleanerCheckInterval - elapsedTime);
+          } catch (InterruptedException ie) {

Review comment:
       I don't think it is redundant.
   
   In initiator, the try-catch (catching throwable) is applied on the complete `while` loop. Since `thread.sleep` is inside while loop, it was redundant in initiator. (Though, `sleep` throws an `Interrupted` exception, we will get out of while loop and initiator will exit).
   
   In case of cleaner, the try-catch (catching throwable) is applied only on the main logic for cleaning directories. This is inside `while` loop (as compared to complete `while` loop for initiator). Here, even if `sleep` throws `Interrupted` exception, Cleaner won't exit because of a separate try-catch statement inside `while` loop.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,60 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerCheckInterval = conf.getTimeVar(
+            HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+    cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+            conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE),
+            COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
   }
 
   @Override
   public void run() {
-    if (cleanerCheckInterval == 0) {
-      cleanerCheckInterval = conf.getTimeVar(
-          HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
-    }
-
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
-        }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
-        }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
         try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          List<CompletableFuture> cleanerList = new ArrayList<>();
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                    clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
+          }
+          CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
+        } finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
         }
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime < cleanerCheckInterval && !stop.get()) {
+          try {
+            Thread.sleep(cleanerCheckInterval - elapsedTime);
+          } catch (InterruptedException ie) {

Review comment:
       There is no catch in Cleaner, its just try-finally. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] belugabehr commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
belugabehr commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458136670



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -113,6 +122,8 @@ public void run() {
         }
       }
     } while (!stop.get());
+
+    ((ExecutorService)cleanerExecutor).shutdown();

Review comment:
       Why not just declare `cleanerExecutor` as and `ExecutorService` to start?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458654179



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -80,39 +98,58 @@ public void run() {
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          int count = 0;
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            completionService.submit(() -> {
+              ThrowingRunnable.unchecked(() -> clean(compactionInfo, minOpenTxnId));
+              return null;
+            });
+            count++;
+          }
+
+          for(int i=0; i<count; i++) {
+            try {
+              completionService.take().get();
+            } catch (InterruptedException| ExecutionException ignore) {
+              // What should we do here?
+            }
+          }
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
         }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
+        finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
         }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
-        try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime >= cleanerCheckInterval || stop.get())  {

Review comment:
       could we refactor this block by negating if condition and removing continue part?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r462133018



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,60 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerCheckInterval = conf.getTimeVar(
+            HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+    cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+            conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE),
+            COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
   }
 
   @Override
   public void run() {
-    if (cleanerCheckInterval == 0) {
-      cleanerCheckInterval = conf.getTimeVar(
-          HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
-    }
-
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
-        }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
-        }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
         try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          List<CompletableFuture> cleanerList = new ArrayList<>();
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                    clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
+          }
+          CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
+        } finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
         }
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime < cleanerCheckInterval && !stop.get()) {
+          try {
+            Thread.sleep(cleanerCheckInterval - elapsedTime);
+          } catch (InterruptedException ie) {

Review comment:
       Added the logging in catch.

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
##########
@@ -274,6 +285,55 @@ public void droppedPartition() throws Exception {
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(0, rsp.getCompactsSize());
   }
+
+  @Test
+  public void processCompactionCandidatesInParallel() throws Exception {
+    Table t = newTable("default", "camipc", true);
+    List<Partition> partitions = new ArrayList<>();
+    Partition p = null;
+    for(int i=0; i<10; i++) {
+      p = newPartition(t, "today" + i);
+
+      addBaseFile(t, p, 20L, 20);
+      addDeltaFile(t, p, 21L, 22L, 2);
+      addDeltaFile(t, p, 23L, 24L, 2);
+      addDeltaFile(t, p, 21L, 24L, 4);
+      partitions.add(p);
+    }
+    burnThroughTransactions("default", "camipc", 25);

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458744425



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -166,6 +167,11 @@ public void run() {
         } catch (Throwable t) {
           LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
               StringUtils.stringifyException(t));
+          if (compactionExecutor != null) {

Review comment:
       same as above. This is to fix the number of threads in case threads of executor are lost. This is similarly done in Worker.java too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r459433801



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -87,6 +87,9 @@
   public void run() {
     // Make sure nothing escapes this run method and kills the metastore at large,
     // so wrap it in a big catch Throwable statement.
+    ExecutorService compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(

Review comment:
       Done. 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -79,7 +82,9 @@ public void run() {
       cleanerCheckInterval = conf.getTimeVar(
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
-
+    ExecutorService cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r460704756



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {

Review comment:
       @adesh-rao, exception from (see how it's done in Initiator)           
             try {
               Thread.sleep(cleanerCheckInterval - elapsedTime);
             } catch (InterruptedException ie) {
               // What can I do about it?
             }
   rest looks good to me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458704339



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {
+          cleanerExecutor.shutdownNow();
+          cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+                  conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE), threadNameFormat);
+        }
+      } finally {
         if (handle != null) {
           handle.releaseLocks();
         }
       }
       // Now, go back to bed until it's time to do this again
       long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+      if (!(elapsedTime >= cleanerCheckInterval || stop.get())) {

Review comment:
       could you move negation inside: elapsedTime < cleanerCheckInterval && !stop.get()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458611187



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +72,23 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private ExecutorService cleanerExecutor;
+  private CompletionService<Void> completionService;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()

Review comment:
       Or maybe use this as an utility method to create the executor like:
   WhateverUtil.createExecutor(String name, int size) {}




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458254344



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +67,15 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private Executor cleanerExecutor;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE));

Review comment:
       Done. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458964924



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -182,6 +197,10 @@ public void run() {
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor initiator, exiting " +
           StringUtils.stringifyException(t));
+    } finally {
+      if (compactionExecutor != null) {
+        compactionExecutor.shutdownNow();

Review comment:
       @adesh-rao, sorry that was actually a right spot, that's basically end of thread execution 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458776747



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {

Review comment:
       You are submitting tasks to the executor inside the loop, all at once, so if there will be an exception in main loop - there won't be any running threads in executor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] belugabehr commented on pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
belugabehr commented on pull request #1275:
URL: https://github.com/apache/hive/pull/1275#issuecomment-662082780


   ```
     public static void main(String[] args) throws IOException
     {
   
       ExecutorService service = Executors.newFixedThreadPool(2);
       CompletionService<Void> completionService = new ExecutorCompletionService<>(service);
   
       for (int i = 0; i < 10; i++)
       {
         final int t = i;
         completionService.submit(() -> {
           clean("File" + t);
           return null;
         });
       }
   
       for (int i = 0; i < 10; ++i) {
         try {
             completionService.take().get();
         } catch (Exception ignore) {}
     }
     }
   
     public static String clean(String path)
     {
       return "Cleaned: " + path;
     }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458705695



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {

Review comment:
       This is to handle the case when threads in executors are lost due to exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458013378



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +67,15 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private Executor cleanerExecutor;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE));

Review comment:
       Yeah, makes sense. Added it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458651196



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -80,39 +98,58 @@ public void run() {
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          int count = 0;
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            completionService.submit(() -> {
+              ThrowingRunnable.unchecked(() -> clean(compactionInfo, minOpenTxnId));
+              return null;
+            });
+            count++;
+          }
+
+          for(int i=0; i<count; i++) {
+            try {
+              completionService.take().get();
+            } catch (InterruptedException| ExecutionException ignore) {
+              // What should we do here?
+            }
+          }
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
         }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
+        finally {
+          if (handle != null) {

Review comment:
       shutdown should be called here, otherwise you can terminate threads from 2nd iteration




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458702723



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {

Review comment:
       what's the purpose of executor service shutdown? it would be needed only when Cleaner thread terminates




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458705459



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -166,6 +167,11 @@ public void run() {
         } catch (Throwable t) {
           LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
               StringUtils.stringifyException(t));
+          if (compactionExecutor != null) {

Review comment:
       same as above, what's the purpose of executor service shutdown? it would be needed only when Initiator thread terminates




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458609954



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -80,39 +98,58 @@ public void run() {
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          int count = 0;
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            completionService.submit(() -> {
+              ThrowingRunnable.unchecked(() -> clean(compactionInfo, minOpenTxnId));
+              return null;
+            });
+            count++;
+          }
+
+          for(int i=0; i<count; i++) {

Review comment:
       nit of the nit: formatting:
   for (int i = 0; i < count; i++) {




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458651847



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -80,39 +98,58 @@ public void run() {
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          int count = 0;
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            completionService.submit(() -> {
+              ThrowingRunnable.unchecked(() -> clean(compactionInfo, minOpenTxnId));
+              return null;
+            });
+            count++;
+          }
+
+          for(int i=0; i<count; i++) {
+            try {
+              completionService.take().get();
+            } catch (InterruptedException| ExecutionException ignore) {
+              // What should we do here?
+            }
+          }
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
         }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
+        finally {
+          if (handle != null) {

Review comment:
       shutdown should be called here, otherwise you can terminate threads from 2nd iteration




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on pull request #1275:
URL: https://github.com/apache/hive/pull/1275#issuecomment-665459298


   @pvary Can you please take one more look at this? There were few modifications after your last comment.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458675680



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -149,17 +156,25 @@ public void run() {
               String runAs = resolveUserToRunAs(tblNameOwners, t, p);
               /* checkForCompaction includes many file metadata checks and may be expensive.
                * Therefore, using a thread pool here and running checkForCompactions in parallel */
-              compactionList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(() ->
-                  scheduleCompactionIfRequired(ci, t, p, runAs)), compactionExecutor));
+              completionService.submit(() -> {
+                ThrowingRunnable.unchecked(() -> scheduleCompactionIfRequired(ci, t, p, runAs));
+                return null;
+              });
+              count++;
             } catch (Throwable t) {
               LOG.error("Caught exception while trying to determine if we should compact {}. " +
                   "Marking failed to avoid repeated failures, {}", ci, t);
               ci.errorMessage = t.getMessage();
               txnHandler.markFailed(ci);
             }
           }
-          CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0]))
-            .join();
+          for(int i=0; i<count; i++) {

Review comment:
       Earlier I missed the fact that we don't care in how and when are task getting completed and using join is better in terms of readability and usability too. So I have reverted this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r459435104



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {

Review comment:
       Which `InterruptedException` are you pointing at?  Also, I have moved the shutdown at the end of run method now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] sankarh commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
sankarh commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r461412352



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+public class CompactorUtil {
+    public interface ThrowingRunnable<E extends Exception> {

Review comment:
       nit: Make it 2 space tabs.

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
##########
@@ -274,6 +285,55 @@ public void droppedPartition() throws Exception {
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(0, rsp.getCompactsSize());
   }
+
+  @Test
+  public void processCompactionCandidatesInParallel() throws Exception {
+    Table t = newTable("default", "camipc", true);
+    List<Partition> partitions = new ArrayList<>();
+    Partition p = null;
+    for(int i=0; i<10; i++) {
+      p = newPartition(t, "today" + i);
+
+      addBaseFile(t, p, 20L, 20);
+      addDeltaFile(t, p, 21L, 22L, 2);
+      addDeltaFile(t, p, 23L, 24L, 2);
+      addDeltaFile(t, p, 21L, 24L, 4);
+      partitions.add(p);
+    }
+    burnThroughTransactions("default", "camipc", 25);

Review comment:
       nit: Need a blank line after closing braces,

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -3028,6 +3028,9 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
 
     HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
         new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
+    HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE("hive.compactor.cleaner.request.queue", 1,

Review comment:
       The config name can be relevant. It actually represent how many threads that we use for parallelly run the cleaner. But, the name sounds like Queue name. Can we change it to "HIVE_COMPACTOR_CLEANER_THREADS_NUM"? 

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
##########
@@ -274,6 +285,55 @@ public void droppedPartition() throws Exception {
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(0, rsp.getCompactsSize());
   }
+
+  @Test
+  public void processCompactionCandidatesInParallel() throws Exception {
+    Table t = newTable("default", "camipc", true);
+    List<Partition> partitions = new ArrayList<>();
+    Partition p = null;
+    for(int i=0; i<10; i++) {

Review comment:
       nit: Make it "for (int i = 0; i < 10; i++)". Check other places in this patch.

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
##########
@@ -274,6 +285,55 @@ public void droppedPartition() throws Exception {
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(0, rsp.getCompactsSize());
   }
+
+  @Test
+  public void processCompactionCandidatesInParallel() throws Exception {
+    Table t = newTable("default", "camipc", true);
+    List<Partition> partitions = new ArrayList<>();
+    Partition p = null;
+    for(int i=0; i<10; i++) {
+      p = newPartition(t, "today" + i);
+
+      addBaseFile(t, p, 20L, 20);
+      addDeltaFile(t, p, 21L, 22L, 2);
+      addDeltaFile(t, p, 23L, 24L, 2);
+      addDeltaFile(t, p, 21L, 24L, 4);
+      partitions.add(p);
+    }
+    burnThroughTransactions("default", "camipc", 25);
+    for(int i=0; i<10; i++) {
+      CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR);
+      rqst.setPartitionname("ds=today"+i);
+      txnHandler.compact(rqst);
+      CompactionInfo ci = txnHandler.findNextToCompact("fred");
+      ci.runAs = System.getProperty("user.name");
+      txnHandler.updateCompactorState(ci, openTxn());
+      txnHandler.markCompacted(ci);
+    }
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE, 3);
+    startCleaner();
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(10, rsp.getCompactsSize());
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+
+    // Check that the files are removed
+    for (Partition pa : partitions) {
+      List<Path> paths = getDirectories(conf, t, pa);
+      Assert.assertEquals(2, paths.size());
+      boolean sawBase = false, sawDelta = false;
+      for (Path path : paths) {
+        if (path.getName().equals("base_20")) sawBase = true;

Review comment:
       nit: Keep the code block of "if", "else if" and "else" in new line with a tab space.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r457977709



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,9 +94,12 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(() ->
+            clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();

Review comment:
       What happens if one of the futures throws the exception?
   Are the others continue to execute? Will we wait until all of the tasks are finished one way or another?
   We do not want multiple Cleaning tasks running concurrently on the same partition.
   
   Seeing this the same problem might arise with the Initiator too. What do you think @deniskuzZ ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r461329189



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,60 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerCheckInterval = conf.getTimeVar(
+            HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+    cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+            conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE),
+            COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
   }
 
   @Override
   public void run() {
-    if (cleanerCheckInterval == 0) {
-      cleanerCheckInterval = conf.getTimeVar(
-          HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
-    }
-
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
-        }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
-        }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
         try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          List<CompletableFuture> cleanerList = new ArrayList<>();
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                    clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
+          }
+          CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
+        } finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
         }
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime < cleanerCheckInterval && !stop.get()) {
+          try {
+            Thread.sleep(cleanerCheckInterval - elapsedTime);
+          } catch (InterruptedException ie) {

Review comment:
       redundant as well 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,60 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerCheckInterval = conf.getTimeVar(
+            HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+    cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+            conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE),
+            COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
   }
 
   @Override
   public void run() {
-    if (cleanerCheckInterval == 0) {
-      cleanerCheckInterval = conf.getTimeVar(
-          HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
-    }
-
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
-        }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
-        }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
         try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          List<CompletableFuture> cleanerList = new ArrayList<>();
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                    clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
+          }
+          CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
+        } finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
         }
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime < cleanerCheckInterval && !stop.get()) {
+          try {
+            Thread.sleep(cleanerCheckInterval - elapsedTime);
+          } catch (InterruptedException ie) {

Review comment:
       InterruptedException catch redundant as well (see Initiator)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -66,53 +69,60 @@
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
+  private ExecutorService cleanerExecutor;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerCheckInterval = conf.getTimeVar(
+            HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+    cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+            conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE),
+            COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
   }
 
   @Override
   public void run() {
-    if (cleanerCheckInterval == 0) {
-      cleanerCheckInterval = conf.getTimeVar(
-          HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
-    }
-
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
-        }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
-        }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
         try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          List<CompletableFuture> cleanerList = new ArrayList<>();
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                    clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
+          }
+          CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
+        } finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
         }
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime < cleanerCheckInterval && !stop.get()) {
+          try {
+            Thread.sleep(cleanerCheckInterval - elapsedTime);
+          } catch (InterruptedException ie) {

Review comment:
       @adesh-rao, it should behave same way as in Initiator. if you interrupt the thread - it should be cleanly interrupted. To be honest i don't see the difference. try-catch in Cleaner covers main do-while loop in Thread.run. Am I missing something here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458776747



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {

Review comment:
       You are submitting all tasks to the executor inside the loop at the end, so if there will be an exception in main loop - there won't be any running threads in executor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458675823



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -182,6 +197,10 @@ public void run() {
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor initiator, exiting " +
           StringUtils.stringifyException(t));
+    } finally {
+      if (compactionExecutor != null) {
+        compactionExecutor.shutdownNow();

Review comment:
       Fixed this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458976358



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -79,7 +82,9 @@ public void run() {
       cleanerCheckInterval = conf.getTimeVar(
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
-
+    ExecutorService cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(

Review comment:
       move this under init, including cleanerCheckInterval initialization




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458978115



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -87,6 +87,9 @@
   public void run() {
     // Make sure nothing escapes this run method and kills the metastore at large,
     // so wrap it in a big catch Throwable statement.
+    ExecutorService compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(

Review comment:
       move this under init




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458706414



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+public class CompactorUtil {

Review comment:
       should it be ThreadUtil?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r457970694



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +67,15 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private Executor cleanerExecutor;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE));

Review comment:
       I think it would be a good idea to shut down the executor when we finished the run loop.
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458624026



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -182,6 +197,10 @@ public void run() {
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor initiator, exiting " +
           StringUtils.stringifyException(t));
+    } finally {
+      if (compactionExecutor != null) {
+        compactionExecutor.shutdownNow();

Review comment:
       what's the reasoning behind this?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -149,17 +156,25 @@ public void run() {
               String runAs = resolveUserToRunAs(tblNameOwners, t, p);
               /* checkForCompaction includes many file metadata checks and may be expensive.
                * Therefore, using a thread pool here and running checkForCompactions in parallel */
-              compactionList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(() ->
-                  scheduleCompactionIfRequired(ci, t, p, runAs)), compactionExecutor));
+              completionService.submit(() -> {
+                ThrowingRunnable.unchecked(() -> scheduleCompactionIfRequired(ci, t, p, runAs));
+                return null;
+              });
+              count++;
             } catch (Throwable t) {
               LOG.error("Caught exception while trying to determine if we should compact {}. " +
                   "Marking failed to avoid repeated failures, {}", ci, t);
               ci.errorMessage = t.getMessage();
               txnHandler.markFailed(ci);
             }
           }
-          CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0]))
-            .join();
+          for(int i=0; i<count; i++) {

Review comment:
       What's the reasoning behind this change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458027913



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +67,15 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private Executor cleanerExecutor;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE));

Review comment:
       I prefer the way how it was done in Worker.java:
   - Named threads
   - Priority down
   - Set daemon on/off
   - Stopped in finally
   Shall we do it here, or should we create a follow-up jira for creating and cleaning up the executor threads for Cleaner and Initiator as well?
   
   Thanks,
   Peter

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +67,15 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private Executor cleanerExecutor;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    cleanerExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE));

Review comment:
       I prefer the way how it was done in Worker.java:
   - Named threads
   - Priority down
   - Set daemon on/off
   - Stopped in finally
   
   Shall we do it here, or should we create a follow-up jira for creating and cleaning up the executor threads for Cleaner and Initiator as well?
   
   Thanks,
   Peter




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458676222



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -80,39 +98,58 @@ public void run() {
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          int count = 0;
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            completionService.submit(() -> {
+              ThrowingRunnable.unchecked(() -> clean(compactionInfo, minOpenTxnId));
+              return null;
+            });
+            count++;
+          }
+
+          for(int i=0; i<count; i++) {
+            try {
+              completionService.take().get();
+            } catch (InterruptedException| ExecutionException ignore) {
+              // What should we do here?
+            }
+          }
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
         }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
+        finally {
+          if (handle != null) {
+            handle.releaseLocks();
+          }
         }
-      }
-      // Now, go back to bed until it's time to do this again
-      long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
-        try {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        } catch (InterruptedException ie) {
-          // What can I do about it?
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime >= cleanerCheckInterval || stop.get())  {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458940334



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {

Review comment:
       Agreed. I have removed the executor shutdown. 
   
   But where/should we add executor shutdown? I think since the Cleaner is supposed to be running all the time. We can skip executor shutdown?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458611187



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +72,23 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private ExecutorService cleanerExecutor;
+  private CompletionService<Void> completionService;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()

Review comment:
       Or maybe use this as an utility method to create the executor like and use this in the run method to create the executor:
   WhateverUtil.createExecutor(String name, int size) {}




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458976358



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -79,7 +82,9 @@ public void run() {
       cleanerCheckInterval = conf.getTimeVar(
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
-
+    ExecutorService cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(

Review comment:
       move this under init




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458001001



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,9 +94,12 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(() ->
+            clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();

Review comment:
       Talked with @deniskuzZ and he convinced me that join() will wait for all of the tasks to finish :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ merged pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ merged pull request #1275:
URL: https://github.com/apache/hive/pull/1275


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on pull request #1275:
URL: https://github.com/apache/hive/pull/1275#issuecomment-661741825


   @pvary Can you please take a look at this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458745804



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+public class CompactorUtil {

Review comment:
       Currently this only contains thread utility methods, but intention of this class is to have compactor utility methods which might be needed in future.
   
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458679273



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -79,7 +81,9 @@ public void run() {
       cleanerCheckInterval = conf.getTimeVar(
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
-
+    String threadNameFormat = "Cleaner-executor-thread-%d";

Review comment:
       Could you please move this to constants




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458611187



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +72,23 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private ExecutorService cleanerExecutor;
+  private CompletionService<Void> completionService;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()

Review comment:
       Or maybe use this as an utility method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458967846



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {

Review comment:
       @adesh-rao, sorry I think you have placed it originally at the right spot, at the end of run() method, you'll probably have to wrap it's internals (do while) with try and put shutdownNow in finally section. note: remove catch InterruptedException.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458969453



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {
+          cleanerExecutor.shutdownNow();
+          cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+                  conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE), threadNameFormat);
+        }
+      } finally {
         if (handle != null) {
           handle.releaseLocks();
         }
       }
       // Now, go back to bed until it's time to do this again
       long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+      if (!(elapsedTime >= cleanerCheckInterval || stop.get())) {

Review comment:
       can't see the change




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] belugabehr commented on pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
belugabehr commented on pull request #1275:
URL: https://github.com/apache/hive/pull/1275#issuecomment-661904392


   Let us not re-invent the wheel here:
   
   ```
     public static void main(String[] args) throws IOException
     {
   
       ExecutorService service = Executors.newFixedThreadPool(2);
       CompletionService<String> completionService = new ExecutorCompletionService<>(service);
   
       for (int i = 0; i < 10; i++)
       {
         final int t = i;
         completionService.submit(() -> {
           return clean("File" + t);
         });
       }
   
       for (int i = 0; i < 10; ++i) {
         try {
             String path = completionService.take().get();
             System.out.println(path);
         } catch (Exception ignore) {}
     }
     }
   
     public static String clean(String path)
     {
       return "Cleaned: " + path;
     }
   ```
   
   https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458609685



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -80,39 +98,58 @@ public void run() {
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          int count = 0;
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            completionService.submit(() -> {
+              ThrowingRunnable.unchecked(() -> clean(compactionInfo, minOpenTxnId));
+              return null;
+            });
+            count++;
+          }
+
+          for(int i=0; i<count; i++) {
+            try {
+              completionService.take().get();
+            } catch (InterruptedException| ExecutionException ignore) {
+              // What should we do here?

Review comment:
       Minimally log the error on info level?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on pull request #1275:
URL: https://github.com/apache/hive/pull/1275#issuecomment-662004212


   @belugabehr 
   
   1) `clean` method does not return anything (`void`). I couldn't find a way to create a `CompletionService` on `void` return type. This means that to use CompletionService, `clean` method's definition need to be changed.
   
   2) we don't care about which executor threads are getting completed when, and also, all the related logging about directory/files deletion are being done in `clean` method only. 
   
   Therefore, 
   `
   for (int i = 0; i < 10; ++i) {
   
         try {
   
             String path = completionService.take().get();
   
             System.out.println(path);
   
         } catch (Exception ignore) {}
   
     }`
   
   the above code will be extra lines of code when we can just use `join()` instead (I think that might have been the initial intention too).
   
   So, the suggestion that you have provided, if necessary, we can take it up in a separate jira (This will require similar changes in Initiator thread too).
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458675972



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -149,17 +156,25 @@ public void run() {
               String runAs = resolveUserToRunAs(tblNameOwners, t, p);
               /* checkForCompaction includes many file metadata checks and may be expensive.
                * Therefore, using a thread pool here and running checkForCompactions in parallel */
-              compactionList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(() ->
-                  scheduleCompactionIfRequired(ci, t, p, runAs)), compactionExecutor));
+              completionService.submit(() -> {
+                ThrowingRunnable.unchecked(() -> scheduleCompactionIfRequired(ci, t, p, runAs));
+                return null;

Review comment:
       Same as above. Removed it.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -80,39 +98,58 @@ public void run() {
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          int count = 0;
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            completionService.submit(() -> {
+              ThrowingRunnable.unchecked(() -> clean(compactionInfo, minOpenTxnId));
+              return null;
+            });
+            count++;
+          }
+
+          for(int i=0; i<count; i++) {
+            try {
+              completionService.take().get();

Review comment:
       Agree, changed it.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -80,39 +98,58 @@ public void run() {
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          int count = 0;
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            completionService.submit(() -> {
+              ThrowingRunnable.unchecked(() -> clean(compactionInfo, minOpenTxnId));
+              return null;
+            });
+            count++;
+          }
+
+          for(int i=0; i<count; i++) {
+            try {
+              completionService.take().get();
+            } catch (InterruptedException| ExecutionException ignore) {
+              // What should we do here?
+            }
+          }
+        } catch (Throwable t) {
+          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+                  StringUtils.stringifyException(t));
         }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
-        if (handle != null) {
-          handle.releaseLocks();
+        finally {
+          if (handle != null) {

Review comment:
       Fixed this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458702723



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {

Review comment:
       what's the purpose of executor service shutdown?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on pull request #1275:
URL: https://github.com/apache/hive/pull/1275#issuecomment-662264113


   @belugabehr thanks for suggestion. Implemented this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458649513



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -80,39 +98,58 @@ public void run() {
           HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
-    do {
-      TxnStore.MutexAPI.LockHandle handle = null;
-      long startedAt = -1;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      try {
-        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
-        startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+    try {
+      do {
+        TxnStore.MutexAPI.LockHandle handle = null;
+        long startedAt = -1;
+        // Make sure nothing escapes this run method and kills the metastore at large,
+        // so wrap it in a big catch Throwable statement.
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+          startedAt = System.currentTimeMillis();
+          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+          int count = 0;
+          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+            completionService.submit(() -> {
+              ThrowingRunnable.unchecked(() -> clean(compactionInfo, minOpenTxnId));
+              return null;
+            });
+            count++;
+          }
+
+          for(int i=0; i<count; i++) {
+            try {
+              completionService.take().get();

Review comment:
       CompletableFuture is a better choice




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458624026



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -182,6 +197,10 @@ public void run() {
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor initiator, exiting " +
           StringUtils.stringifyException(t));
+    } finally {
+      if (compactionExecutor != null) {
+        compactionExecutor.shutdownNow();

Review comment:
       what's the reasoning behind this? if you want to call it, do it before releasing the lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r458610811



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -64,13 +72,23 @@
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
+  private ExecutorService cleanerExecutor;
+  private CompletionService<Void> completionService;
 
   private ReplChangeManager replChangeManager;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()

Review comment:
       Shall we move this to the run method too? It would make it easier to understand the code IMHO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] adesh-rao commented on a change in pull request #1275: HIVE-23324: Parallelise compaction directory cleaning process

Posted by GitBox <gi...@apache.org>.
adesh-rao commented on a change in pull request #1275:
URL: https://github.com/apache/hive/pull/1275#discussion_r459434134



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -89,23 +93,28 @@ public void run() {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
         long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+        List<CompletableFuture> cleanerList = new ArrayList<>();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
-          clean(compactionInfo, minOpenTxnId);
+          cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
         }
+        CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-            StringUtils.stringifyException(t));
-      }
-      finally {
+                StringUtils.stringifyException(t));
+        if (cleanerExecutor != null) {
+          cleanerExecutor.shutdownNow();
+          cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
+                  conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE), threadNameFormat);
+        }
+      } finally {
         if (handle != null) {
           handle.releaseLocks();
         }
       }
       // Now, go back to bed until it's time to do this again
       long elapsedTime = System.currentTimeMillis() - startedAt;
-      if (elapsedTime >= cleanerCheckInterval || stop.get())  {
-        continue;
-      } else {
+      if (!(elapsedTime >= cleanerCheckInterval || stop.get())) {

Review comment:
       This is outdated. I have added a comment at the right place.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org