You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2023/01/27 16:56:18 UTC

[hive] branch master updated: HIVE-26947: Handle compaction.Worker sleep for err cases to avoid high frequency connection respawning to HMS (Akshat Mathur, reviewed by Denys Kuzmenko, Laszlo Vegh)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f9a977cf3 HIVE-26947: Handle compaction.Worker sleep for err cases to avoid high frequency connection respawning to HMS (Akshat Mathur, reviewed by Denys Kuzmenko, Laszlo Vegh)
d8f9a977cf3 is described below

commit d8f9a977cf37afcb2b5cdc7b1ea9e57e26908bc3
Author: Akshat <ak...@gmail.com>
AuthorDate: Fri Jan 27 22:25:54 2023 +0530

    HIVE-26947: Handle compaction.Worker sleep for err cases to avoid high frequency connection respawning to HMS (Akshat Mathur, reviewed by Denys Kuzmenko, Laszlo Vegh)
    
    Closes #3955
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  9 +++++++
 .../hadoop/hive/ql/txn/compactor/Worker.java       | 30 ++++++++++++++++------
 .../hadoop/hive/ql/txn/compactor/TestWorker.java   |  2 ++
 3 files changed, 33 insertions(+), 8 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e2aa4cf845d..ea7c56d10f9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3215,6 +3215,15 @@ public class HiveConf extends Configuration {
         "Time in seconds after which a compaction job will be declared failed and the\n" +
         "compaction re-queued."),
 
+    HIVE_COMPACTOR_WORKER_SLEEP_TIME("hive.compactor.worker.sleep.time", "10800ms",
+        new TimeValidator(TimeUnit.MILLISECONDS),
+        "Time in milliseconds for which a worker threads goes into sleep before starting another iteration " +
+                "in case of no launched job or error"),
+
+    HIVE_COMPACTOR_WORKER_MAX_SLEEP_TIME("hive.compactor.worker.max.sleep.time", "320000ms",
+        new TimeValidator(TimeUnit.MILLISECONDS),
+        "Max time in milliseconds for which a worker threads goes into sleep before starting another iteration " +
+                "used for backoff in case of no launched job or error"),
     HIVE_COMPACTOR_CHECK_INTERVAL("hive.compactor.check.interval", "300s",
         new TimeValidator(TimeUnit.SECONDS),
         "Time in seconds between checks to see if any tables or partitions need to be\n" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index b8e94509b81..b0bec164fce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -79,7 +79,8 @@ import java.util.stream.Collectors;
 public class Worker extends RemoteCompactorThread implements MetaStoreThread {
   static final private String CLASS_NAME = Worker.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-  static final private long SLEEP_TIME = 10000;
+  private static long SLEEP_TIME_MAX;
+  static private long SLEEP_TIME;
 
   private String workerName;
   private final CompactorFactory compactorFactory;
@@ -102,12 +103,14 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     boolean genericStats = conf.getBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS);
     boolean mrStats = conf.getBoolVar(HiveConf.ConfVars.HIVE_MR_COMPACTOR_GATHER_STATS);
     long timeout = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS);
+    long nextSleep = SLEEP_TIME;
     boolean launchedJob;
     ExecutorService executor = getTimeoutHandlingExecutor();
     try {
       do {
         long startedAt = System.currentTimeMillis();
-        launchedJob = true;
+        boolean err = false;
+        launchedJob = false;
         Future<Boolean> singleRun = executor.submit(() -> findNextCompactionAndExecute(genericStats, mrStats));
         try {
           launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS);
@@ -118,24 +121,33 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
           singleRun.cancel(true);
           executor.shutdownNow();
           executor = getTimeoutHandlingExecutor();
+          err = true;
         } catch (ExecutionException e) {
           LOG.info("Exception during executing compaction", e);
+          err = true;
         } catch (InterruptedException ie) {
-          // do not ignore interruption requests
-          return;
+          Thread.currentThread().interrupt();
+        } catch (Throwable t) {
+          err = true;
         }
 
         doPostLoopActions(System.currentTimeMillis() - startedAt);
 
         // If we didn't try to launch a job it either means there was no work to do or we got
-        // here as the result of a communication failure with the DB.  Either way we want to wait
+        // here as the result of an error like communication failure with the DB, schema failures etc.  Either way we want to wait
         // a bit before, otherwise we can start over the loop immediately.
-        if (!launchedJob && !stop.get()) {
-          Thread.sleep(SLEEP_TIME);
+        if ((!launchedJob || err) && !stop.get()) {
+          Thread.sleep(nextSleep);
         }
+        //Backoff mechanism
+        //Increase sleep time if error persist
+        //Reset sleep time to default once error is resolved
+        nextSleep = (err) ? nextSleep * 2 : SLEEP_TIME;
+        if (nextSleep > SLEEP_TIME_MAX) nextSleep = SLEEP_TIME_MAX;
+
       } while (!stop.get());
     } catch (InterruptedException e) {
-      // do not ignore interruption requests
+      Thread.currentThread().interrupt();
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor worker, exiting.", t);
     } finally {
@@ -152,6 +164,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
+    SLEEP_TIME = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_SLEEP_TIME, TimeUnit.MILLISECONDS);
+    SLEEP_TIME_MAX = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_MAX_SLEEP_TIME, TimeUnit.MILLISECONDS);
     this.workerName = getWorkerId();
     setName(workerName);
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 72fad1c1efc..905d82a0659 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -1201,6 +1201,8 @@ public class TestWorker extends CompactorTest {
     ExecutorService executor = Executors.newSingleThreadExecutor();
     HiveConf timeoutConf = new HiveConf(conf);
     timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, timeout, TimeUnit.MILLISECONDS);
+    timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_SLEEP_TIME, 20, TimeUnit.MILLISECONDS);
+    timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_MAX_SLEEP_TIME, 20, TimeUnit.MILLISECONDS);
 
     TimeoutWorker timeoutWorker = getTimeoutWorker(timeoutConf, executor,
         runForever, swallowInterrupt, new CountDownLatch(2));