You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/06/02 07:28:40 UTC

[hive] branch master updated: HIVE-23555: Cancel compaction jobs when hive.compactor.worker.timeout is reached (Peter Vary reviewed by Karen Coppage and Laszlo Pinter)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b67087  HIVE-23555: Cancel compaction jobs when hive.compactor.worker.timeout is reached (Peter Vary reviewed by Karen Coppage and Laszlo Pinter)
4b67087 is described below

commit 4b670877c280b37c5776046f66d766079489b2a8
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Tue Jun 2 09:27:47 2020 +0200

    HIVE-23555: Cancel compaction jobs when hive.compactor.worker.timeout is reached (Peter Vary reviewed by Karen Coppage and Laszlo Pinter)
---
 .../hive/hcatalog/streaming/TestStreaming.java     |   3 +-
 .../hive/ql/txn/compactor/CompactorTestUtil.java   |  57 +--
 .../hive/ql/txn/compactor/TestCompactor.java       |   4 +-
 .../hadoop/hive/ql/stats/StatsUpdaterThread.java   |   3 +-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |  11 +-
 .../hive/ql/txn/compactor/CompactorThread.java     |  12 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |   4 +-
 .../ql/txn/compactor/MetaStoreCompactorThread.java |   4 +-
 .../ql/txn/compactor/RemoteCompactorThread.java    |   6 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java       | 458 ++++++++++++---------
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |   2 +-
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java    |   3 +-
 .../hive/ql/stats/TestStatsUpdaterThread.java      |   2 +-
 .../hive/ql/txn/compactor/CompactorTest.java       |  17 +-
 .../hadoop/hive/ql/txn/compactor/TestWorker.java   |  88 ++++
 .../hadoop/hive/metastore/HiveMetaStore.java       |   2 +-
 .../hadoop/hive/metastore/MetaStoreThread.java     |   9 +-
 .../org/apache/hive/streaming/TestStreaming.java   |   3 +-
 18 files changed, 377 insertions(+), 311 deletions(-)

diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 569de70..3db56a0 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -433,8 +433,7 @@ public class TestStreaming {
     Worker t = new Worker();
     t.setThreadId((int) t.getId());
     t.setConf(hiveConf);
-    AtomicBoolean looped = new AtomicBoolean();
-    t.init(stop, looped);
+    t.init(stop);
     t.run();
   }
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index e70d878..999ac7e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -118,7 +118,7 @@ class CompactorTestUtil {
     Worker t = new Worker();
     t.setThreadId((int) t.getId());
     t.setConf(hiveConf);
-    t.init(new AtomicBoolean(true), new AtomicBoolean());
+    t.init(new AtomicBoolean(true));
     if (partNames.length == 0) {
       txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType));
       t.run();
@@ -139,67 +139,14 @@ class CompactorTestUtil {
    */
   static void runCleaner(HiveConf hConf) throws Exception {
     HiveConf hiveConf = new HiveConf(hConf);
-    AtomicBoolean stop = new AtomicBoolean(true);
     Cleaner t = new Cleaner();
     t.setThreadId((int) t.getId());
     t.setConf(hiveConf);
-    AtomicBoolean looped = new AtomicBoolean();
-    t.init(stop, looped);
+    t.init(new AtomicBoolean(true));
     t.run();
   }
 
   /**
-   * Trigger compaction initiator.
-   * @param hConf hive configuration
-   * @param isQueryBased run compaction as query based
-   * @throws Exception if initiator cannot be started.
-   */
-  static void runInitiator(HiveConf hConf, boolean isQueryBased) throws Exception {
-    HiveConf hiveConf = new HiveConf(hConf);
-    hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, isQueryBased);
-    AtomicBoolean stop = new AtomicBoolean(true);
-    Initiator t = new Initiator();
-    t.setThreadId((int) t.getId());
-    t.setConf(hiveConf);
-    AtomicBoolean looped = new AtomicBoolean();
-    t.init(stop, looped);
-    t.run();
-  }
-
-  /**
-   * Trigger compaction worker.
-   * @param hConf hive configuration
-   * @param isQueryBased run compaction as query based
-   * @throws Exception if worker cannot be started.
-   */
-  static void runWorker(HiveConf hConf, boolean isQueryBased) throws Exception {
-    HiveConf hiveConf = new HiveConf(hConf);
-    hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, isQueryBased);
-    AtomicBoolean stop = new AtomicBoolean(true);
-    Worker t = new Worker();
-    t.setThreadId((int) t.getId());
-    t.setConf(hiveConf);
-    AtomicBoolean looped = new AtomicBoolean();
-    t.init(stop, looped);
-    t.run();
-  }
-
-  /**
-   * Execute Hive CLI statement.
-   * @param cmd arbitrary statement to execute
-   * @param driver execution driver
-   * @throws Exception failed to execute statement
-   */
-  void executeStatementOnDriver(String cmd, IDriver driver) throws Exception {
-    LOG.debug("Executing: " + cmd);
-    try {
-      driver.run(cmd);
-    } catch (CommandProcessorException e) {
-      throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + e);
-    }
-  }
-
-  /**
    * Execute Hive CLI statement and get back result.
    * @param cmd arbitrary statement to execute
    * @param driver execution driver
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 32fe535..9e19105 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1120,7 +1120,7 @@ public class TestCompactor {
     Worker t = new Worker();
     t.setThreadId((int) t.getId());
     t.setConf(conf);
-    t.init(new AtomicBoolean(true), new AtomicBoolean());
+    t.init(new AtomicBoolean(true));
     if (partNames.length == 0) {
       txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
       t.run();
@@ -1372,7 +1372,7 @@ public class TestCompactor {
     Worker t = new Worker();
     t.setThreadId((int) t.getId());
     t.setConf(conf);
-    t.init(new AtomicBoolean(true), new AtomicBoolean());
+    t.init(new AtomicBoolean(true));
     CompactionRequest Cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
     Cr.setProperties(tblProperties);
     txnHandler.compact(Cr);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index ecaad50..7d6e122 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -130,9 +130,8 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread {
   }
 
   @Override
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException {
+  public void init(AtomicBoolean stop) throws MetaException {
     this.stop = stop;
-    this.looped = looped;
     setPriority(MIN_PRIORITY);
     setDaemon(true);
     String user = "anonymous";
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 5fa3d9a..2a15913 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -68,8 +68,8 @@ public class Cleaner extends MetaStoreCompactorThread {
   private ReplChangeManager replChangeManager;
 
   @Override
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
-    super.init(stop, looped);
+  public void init(AtomicBoolean stop) throws Exception {
+    super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
   }
 
@@ -81,10 +81,6 @@ public class Cleaner extends MetaStoreCompactorThread {
     }
 
     do {
-      // This is solely for testing.  It checks if the test has set the looped value to false,
-      // and if so remembers that and then sets it to true at the end.  We have to check here
-      // first to make sure we go through a complete iteration of the loop before resetting it.
-      boolean setLooped = !looped.get();
       TxnStore.MutexAPI.LockHandle handle = null;
       long startedAt = -1;
       // Make sure nothing escapes this run method and kills the metastore at large,
@@ -105,9 +101,6 @@ public class Cleaner extends MetaStoreCompactorThread {
           handle.releaseLocks();
         }
       }
-      if (setLooped) {
-        looped.set(true);
-      }
       // Now, go back to bed until it's time to do this again
       long elapsedTime = System.currentTimeMillis() - startedAt;
       if (elapsedTime >= cleanerCheckInterval || stop.get())  {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index b378d40..fb23c2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -55,7 +56,6 @@ public abstract class CompactorThread extends Thread implements Configurable {
   protected HiveConf conf;
 
   protected AtomicBoolean stop;
-  protected AtomicBoolean looped;
 
   protected int threadId;
 
@@ -78,11 +78,10 @@ public abstract class CompactorThread extends Thread implements Configurable {
     return conf;
   }
 
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
+  public void init(AtomicBoolean stop) throws Exception {
     setPriority(MIN_PRIORITY);
     setDaemon(true); // this means the process will exit without waiting for this thread
     this.stop = stop;
-    this.looped = looped;
   }
 
   /**
@@ -120,11 +119,12 @@ public abstract class CompactorThread extends Thread implements Configurable {
           return null;
         }
       } catch (Exception e) {
-        LOG.error("Unable to find partition " + ci.getFullPartitionName() + ", " + e.getMessage());
+        LOG.error("Unable to find partition " + ci.getFullPartitionName(), e);
         throw e;
       }
       if (parts.size() != 1) {
-        LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " + parts);
+        LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " +
+                      Arrays.toString(parts.toArray()));
         throw new MetaException("Too many partitions for : " + ci.getFullPartitionName());
       }
       return parts.get(0);
@@ -218,7 +218,7 @@ public abstract class CompactorThread extends Thread implements Configurable {
     LOG.info("Starting compactor thread of type " + thread.getClass().getName());
     thread.setConf(conf);
     thread.setThreadId(nextThreadId.incrementAndGet());
-    thread.init(new AtomicBoolean(), new AtomicBoolean());
+    thread.init(new AtomicBoolean());
     thread.start();
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index fa2ede3..7913295 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -229,8 +229,8 @@ public class Initiator extends MetaStoreCompactorThread {
   }
 
   @Override
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
-    super.init(stop, looped);
+  public void init(AtomicBoolean stop) throws Exception {
+    super.init(stop);
     checkInterval = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
     compactionExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE));
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index aa258b3..5d7d810 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -51,8 +51,8 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
   }
 
   @Override
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
-    super.init(stop, looped);
+  public void init(AtomicBoolean stop) throws Exception {
+    super.init(stop);
 
     // Get our own instance of the transaction handler
     txnHandler = TxnUtils.getTxnStore(conf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
index 4235184..a4dfdda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
@@ -42,8 +42,8 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa
 public class RemoteCompactorThread extends CompactorThread {
   protected IMetaStoreClient msc;
 
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
-    super.init(stop, looped);
+  public void init(AtomicBoolean stop) throws Exception {
+    super.init(stop);
     this.msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
   }
 
@@ -51,7 +51,7 @@ public class RemoteCompactorThread extends CompactorThread {
     try {
       return msc.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
     } catch (TException e) {
-      LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage());
+      LOG.error("Unable to find table " + ci.getFullTableName(), e);
       throw new MetaException(e.toString());
     }
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 8180adc..a67db8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
@@ -32,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.common.util.Ref;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -56,7 +55,12 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -71,7 +75,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
   private static final int NOT_SET = -1;
 
   private String workerName;
-  private JobConf mrJob; // the MR job for compaction
 
   /**
    * Get the hostname that this worker is run on.  Made static and public so that other classes
@@ -86,207 +89,55 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
       throw new RuntimeException(e);
     }
   }
-//todo: this doesn;t check if compaction is already running (even though Initiator does but we
-// don't go  through Initiator for user initiated compactions)
+
+  // TODO: this doesn't check if compaction is already running (even though Initiator does but we
+  // don't go through Initiator for user initiated compactions)
   @Override
   public void run() {
     LOG.info("Starting Worker thread");
     boolean computeStats = conf.getBoolVar(HiveConf.ConfVars.HIVE_MR_COMPACTOR_GATHER_STATS);
-    do {
-      boolean launchedJob = false;
-      // Make sure nothing escapes this run method and kills the metastore at large,
-      // so wrap it in a big catch Throwable statement.
-      CompactionHeartbeater heartbeater = null;
-      CompactionInfo ci = null;
-      long compactorTxnId = NOT_SET;
-      try {
-        if (msc == null) {
-          msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+    long timeout = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS);
+    boolean launchedJob;
+    ExecutorService executor = getTimeoutHandlingExecutor();
+    try {
+      do {
+        Future<Boolean> singleRun = executor.submit(() -> findNextCompactionAndExecute(computeStats));
+        try {
+          launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException te) {
+          LOG.info("Timeout during executing compaction", te);
+          // Cancel the job, and recreate the Executor as well, so we can be sure that we have an available thread
+          // even if we can not interrupt the task somehow. (Trade possible resource hogging for compactor stability)
+          singleRun.cancel(true);
+          executor.shutdownNow();
+          executor = getTimeoutHandlingExecutor();
+          launchedJob = true;
+        } catch (ExecutionException e) {
+          LOG.info("Exception during executing compaction", e);
+          launchedJob = true;
+        } catch (InterruptedException ie) {
+          // Do not do anything - stop should be set anyway
+          launchedJob = true;
         }
-        ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(workerName));
-        LOG.debug("Processing compaction request " + ci);
 
-        if (ci == null && !stop.get()) {
+        // If we didn't try to launch a job it either means there was no work to do or we got
+        // here as the result of a communication failure with the DB.  Either way we want to wait
+        // a bit before we restart the loop.
+        if (!launchedJob && !stop.get()) {
           try {
             Thread.sleep(SLEEP_TIME);
-            continue;
           } catch (InterruptedException e) {
-            LOG.warn("Worker thread sleep interrupted " + e.getMessage());
-            continue;
-          }
-        }
-
-        // Find the table we will be working with.
-        Table t1 = null;
-        try {
-          t1 = resolveTable(ci);
-          if (t1 == null) {
-            LOG.info("Unable to find table " + ci.getFullTableName() +
-                ", assuming it was dropped and moving on.");
-            msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
-            continue;
-          }
-        } catch (MetaException e) {
-          msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
-          continue;
-        }
-        // This chicanery is to get around the fact that the table needs to be final in order to
-        // go into the doAs below.
-        final Table t = t1;
-
-        // Find the partition we will be working with, if there is one.
-        Partition p = null;
-        try {
-          p = resolvePartition(ci);
-          if (p == null && ci.partName != null) {
-            LOG.info("Unable to find partition " + ci.getFullPartitionName() +
-                ", assuming it was dropped and moving on.");
-            msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
-            continue;
-          }
-        } catch (Exception e) {
-          msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
-          continue;
-        }
-
-        // Find the appropriate storage descriptor
-        final StorageDescriptor sd =  resolveStorageDescriptor(t, p);
-
-        // Check that the table or partition isn't sorted, as we don't yet support that.
-        if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
-          LOG.error("Attempt to compact sorted table "+ci.getFullTableName()+", which is not yet supported!");
-          msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
-          continue;
-        }
-        String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
-        if (ci.runAs == null) {
-          ci.runAs = findUserToRunAs(sd.getLocation(), t);
-        }
-        /**
-         * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) since
-         * then the Driver would already have the an open txn but then this txn would have
-         * multiple statements in it (for query based compactor) which is not supported (and since
-         * this case some of the statements are DDL, even in the future will not be allowed in a
-         * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */
-        compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
-
-        heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf);
-        heartbeater.start();
-
-        ValidTxnList validTxnList = msc.getValidTxns(compactorTxnId);
-        //with this ValidWriteIdList is capped at whatever HWM validTxnList has
-        final ValidCompactorWriteIdList tblValidWriteIds =
-                TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds(
-                    Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0));
-        LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
-        conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
-
-        ci.highestWriteId = tblValidWriteIds.getHighWatermark();
-        //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about
-        //it until after any data written by it are physically removed
-        msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId);
-        final StringBuilder jobName = new StringBuilder(workerName);
-        jobName.append("-compactor-");
-        jobName.append(ci.getFullPartitionName());
-
-        // Don't start compaction or cleaning if not necessary
-        AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf,
-            tblValidWriteIds, Ref.from(false), true, null, false);
-        if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
-          if (needsCleaning(dir, sd)) {
-            msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
-          } else {
-            // do nothing
-            msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
-          }
-          continue;
-        }
-
-        LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " +
-            JavaUtils.txnIdToString(compactorTxnId) + " with compute stats set to " + computeStats);
-        final StatsUpdater su = computeStats ? StatsUpdater.init(ci, msc.findColumnsWithStats(
-            CompactionInfo.compactionInfoToStruct(ci)), conf,
-          runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()) : null;
-        final CompactorMR mr = new CompactorMR();
-        launchedJob = true;
-        try {
-          if (runJobAsSelf(ci.runAs)) {
-            mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc, dir);
-          } else {
-            UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
-              UserGroupInformation.getLoginUser());
-            final Partition fp = p;
-            final CompactionInfo fci = ci;
-            ugi.doAs(new PrivilegedExceptionAction<Object>() {
-              @Override
-              public Object run() throws Exception {
-                mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, fci, su, msc, dir);
-                return null;
-              }
-            });
-            try {
-              FileSystem.closeAllForUGI(ugi);
-            } catch (IOException exception) {
-              LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " +
-                  ci.getFullPartitionName(), exception);
-            }
-          }
-          heartbeater.cancel();
-          msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
-          if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
-            mrJob = mr.getMrJob();
-          }
-        } catch (Throwable e) {
-          LOG.error("Caught exception while trying to compact " + ci +
-              ".  Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e));
-          ci.errorMessage = e.getMessage();
-          msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
-          msc.abortTxns(Collections.singletonList(compactorTxnId));
-          compactorTxnId = NOT_SET;
-        }
-      } catch (TException | IOException t) {
-        LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " +
-            StringUtils.stringifyException(t));
-        try {
-          if (msc != null && ci != null) {
-            ci.errorMessage = t.getMessage();
-            msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
-            compactorTxnId = NOT_SET;
-          }
-        } catch (TException e) {
-          LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, e);
-        } finally {
-          if (msc != null) {
-            msc.close();
-            msc = null;
           }
         }
-        try {
-          Thread.sleep(SLEEP_TIME);
-        } catch (InterruptedException e) {
-          LOG.error("Interrupted while sleeping to instantiate metastore client");
-        }
-      } catch (Throwable t) {
-        LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " +
-            StringUtils.stringifyException(t));
-        compactorTxnId = NOT_SET;
-      } finally {
-        commitTxnIfSet(compactorTxnId);
-        if (heartbeater != null) {
-          heartbeater.cancel();
-        }
+      } while (!stop.get());
+    } finally {
+      if (executor != null) {
+        executor.shutdownNow();
       }
-
-      // If we didn't try to launch a job it either means there was no work to do or we got
-      // here as the result of a communication failure with the DB.  Either way we want to wait
-      // a bit before we restart the loop.
-      if (!launchedJob && !stop.get()) {
-        try {
-          Thread.sleep(SLEEP_TIME);
-        } catch (InterruptedException e) {
-        }
+      if (msc != null) {
+        msc.close();
       }
-    } while (!stop.get());
+    }
   }
 
   private void commitTxnIfSet(long compactorTxnId) {
@@ -297,15 +148,14 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
         }
       } catch (TException e) {
         LOG.error(
-            "Caught an exception while committing compaction in worker " + workerName + ", "
-                + StringUtils.stringifyException(e));
+            "Caught an exception while committing compaction in worker " + workerName, e);
       }
     }
   }
 
   @Override
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
-    super.init(stop, looped);
+  public void init(AtomicBoolean stop) throws Exception {
+    super.init(stop);
 
     StringBuilder name = new StringBuilder(hostname());
     name.append("-");
@@ -314,10 +164,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     setName(name.toString());
   }
 
-  public JobConf getMrJob() {
-    return mrJob;
-  }
-
   static final class StatsUpdater {
     static final private Logger LOG = LoggerFactory.getLogger(StatsUpdater.class);
 
@@ -516,4 +362,218 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     }
     return needsJustCleaning;
   }
+
+  /**
+   * Creates a single threaded executor used for handling timeouts.
+   * The thread settings are inherited from the current thread.
+   * @return Single threaded executor service to be used for timeout handling
+   */
+  private ExecutorService getTimeoutHandlingExecutor() {
+    return Executors.newSingleThreadExecutor((r) -> {
+      Thread masterThread = Thread.currentThread();
+      Thread t = new Thread(masterThread.getThreadGroup(), r, masterThread.getName() + "_timeout_executor");
+      t.setDaemon(masterThread.isDaemon());
+      t.setPriority(masterThread.getPriority());
+      return t;
+    });
+  }
+
+  /**
+   * Finds the next compaction and executes it. The main thread might interrupt the execution of this method
+   * in case of timeout.
+   * @param computeStats If true then for MR compaction the stats are regenerated
+   * @return Returns true, if there was compaction in the queue, and we started working on it.
+   * @throws InterruptedException is thrown when the process is interrupted because of timeout for example
+   */
+  @VisibleForTesting
+  protected Boolean findNextCompactionAndExecute(boolean computeStats) throws InterruptedException {
+    // Make sure nothing escapes this run method and kills the metastore at large,
+    // so wrap it in a big catch Throwable statement.
+    CompactionHeartbeater heartbeater = null;
+    CompactionInfo ci = null;
+    long compactorTxnId = NOT_SET;
+    try {
+      if (msc == null) {
+        msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+      }
+      ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(workerName));
+      LOG.debug("Processing compaction request " + ci);
+
+      if (ci == null) {
+        return false;
+      }
+
+      checkInterrupt();
+
+      // Find the table we will be working with.
+      Table t1;
+      try {
+        t1 = resolveTable(ci);
+        if (t1 == null) {
+          LOG.info("Unable to find table " + ci.getFullTableName() +
+                       ", assuming it was dropped and moving on.");
+          msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+          return false;
+        }
+      } catch (MetaException e) {
+        msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+        return false;
+      }
+
+      checkInterrupt();
+
+      // This chicanery is to get around the fact that the table needs to be final in order to
+      // go into the doAs below.
+      final Table t = t1;
+      String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
+
+      // Find the partition we will be working with, if there is one.
+      Partition p;
+      try {
+        p = resolvePartition(ci);
+        if (p == null && ci.partName != null) {
+          LOG.info("Unable to find partition " + ci.getFullPartitionName() +
+                       ", assuming it was dropped and moving on.");
+          msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+          return false;
+        }
+      } catch (Exception e) {
+        msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+        return false;
+      }
+
+      checkInterrupt();
+
+      // Find the appropriate storage descriptor
+      final StorageDescriptor sd =  resolveStorageDescriptor(t, p);
+
+      // Check that the table or partition isn't sorted, as we don't yet support that.
+      if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
+        LOG.error("Attempt to compact sorted table " + ci.getFullTableName() + ", which is not yet supported!");
+        msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+        return false;
+      }
+
+      if (ci.runAs == null) {
+        ci.runAs = findUserToRunAs(sd.getLocation(), t);
+      }
+
+      checkInterrupt();
+
+      /**
+       * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) since
+       * then the Driver would already have the an open txn but then this txn would have
+       * multiple statements in it (for query based compactor) which is not supported (and since
+       * this case some of the statements are DDL, even in the future will not be allowed in a
+       * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */
+      compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
+
+      heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf);
+      heartbeater.start();
+
+      ValidTxnList validTxnList = msc.getValidTxns(compactorTxnId);
+      //with this ValidWriteIdList is capped at whatever HWM validTxnList has
+      final ValidCompactorWriteIdList tblValidWriteIds =
+          TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds(
+              Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0));
+      LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
+      conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+
+      ci.highestWriteId = tblValidWriteIds.getHighWatermark();
+      //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about
+      //it until after any data written by it are physically removed
+      msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId);
+
+      checkInterrupt();
+
+      final StringBuilder jobName = new StringBuilder(workerName);
+      jobName.append("-compactor-");
+      jobName.append(ci.getFullPartitionName());
+
+      // Don't start compaction or cleaning if not necessary
+      AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf,
+          tblValidWriteIds, Ref.from(false), true, null, false);
+      if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
+        if (needsCleaning(dir, sd)) {
+          msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
+        } else {
+          // do nothing
+          msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+        }
+        return false;
+      }
+
+      checkInterrupt();
+
+      LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " +
+                   JavaUtils.txnIdToString(compactorTxnId) + " with compute stats set to " + computeStats);
+      final StatsUpdater su = computeStats ? StatsUpdater.init(ci, msc.findColumnsWithStats(
+          CompactionInfo.compactionInfoToStruct(ci)), conf,
+          runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()) : null;
+      final CompactorMR mr = new CompactorMR();
+      try {
+        if (runJobAsSelf(ci.runAs)) {
+          mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc, dir);
+        } else {
+          UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
+              UserGroupInformation.getLoginUser());
+          final Partition fp = p;
+          final CompactionInfo fci = ci;
+          ugi.doAs(new PrivilegedExceptionAction<Object>() {
+            @Override
+            public Object run() throws Exception {
+              mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, fci, su, msc, dir);
+              return null;
+            }
+          });
+          try {
+            FileSystem.closeAllForUGI(ugi);
+          } catch (IOException exception) {
+            LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " +
+                          ci.getFullPartitionName(), exception);
+          }
+        }
+        heartbeater.cancel();
+        msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
+      } catch (Throwable e) {
+        LOG.error("Caught exception while trying to compact " + ci +
+                      ".  Marking failed to avoid repeated failures", e);
+        ci.errorMessage = e.getMessage();
+        msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
+        msc.abortTxns(Collections.singletonList(compactorTxnId));
+        compactorTxnId = NOT_SET;
+      }
+    } catch (TException | IOException t) {
+      LOG.error("Caught an exception in the main loop of compactor worker " + workerName, t);
+      try {
+        if (msc != null && ci != null) {
+          ci.errorMessage = t.getMessage();
+          msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
+          compactorTxnId = NOT_SET;
+        }
+      } catch (TException e) {
+        LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, e);
+      } finally {
+        if (msc != null) {
+          msc.close();
+          msc = null;
+        }
+      }
+    } catch (Throwable t) {
+      LOG.error("Caught an exception in the main loop of compactor worker " + workerName, t);
+      compactorTxnId = NOT_SET;
+    } finally {
+      commitTxnIfSet(compactorTxnId);
+      if (heartbeater != null) {
+        heartbeater.cancel();
+      }
+    }
+    return true;
+  }
+
+  private void checkInterrupt() throws InterruptedException {
+    if (Thread.interrupted()) {
+      throw new InterruptedException("Compaction execution is interrupted");
+    }
+  }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 366282a..337f469 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -1041,7 +1041,7 @@ public class TestTxnCommands2 {
     Initiator init = new Initiator();
     init.setThreadId((int)init.getId());
     init.setConf(hiveConf);
-    init.init(stop, new AtomicBoolean());
+    init.init(stop);
     init.run();
     int numAttemptedCompactions = 1;
     checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 3ff68a3..91752e8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -197,8 +197,7 @@ public abstract class TxnCommandsBaseForTests {
     }
     t.setThreadId((int) t.getId());
     t.setConf(hiveConf);
-    AtomicBoolean looped = new AtomicBoolean();
-    t.init(stop, looped);
+    t.init(stop);
     t.run();
   }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
index 84827d1..afe6070 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
@@ -722,7 +722,7 @@ public class TestStatsUpdaterThread {
   private StatsUpdaterThread createUpdater() throws MetaException {
     StatsUpdaterThread su = new StatsUpdaterThread();
     su.setConf(hiveConf);
-    su.init(new AtomicBoolean(false), null);
+    su.init(new AtomicBoolean(false));
     return su;
   }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 9a9ab53..84a23b2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -124,10 +124,6 @@ public abstract class CompactorTest {
     startThread('c', true);
   }
 
-  protected void startCleaner(AtomicBoolean looped) throws Exception {
-    startThread('c', false, looped);
-  }
-
   protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException {
     return newTable(dbName, tableName, partitioned, new HashMap<String, String>(), null, false);
   }
@@ -138,10 +134,6 @@ public abstract class CompactorTest {
 
   }
 
-  protected Table newTempTable(String tableName) throws TException {
-    return newTable("default", tableName, false, null, null, true);
-  }
-
   protected Table newTable(String dbName, String tableName, boolean partitioned,
                            Map<String, String> parameters, List<Order> sortCols,
                            boolean  isTemporary)
@@ -298,13 +290,8 @@ public abstract class CompactorTest {
 
   // I can't do this with @Before because I want to be able to control when the thread starts
   private void startThread(char type, boolean stopAfterOne) throws Exception {
-    startThread(type, stopAfterOne, new AtomicBoolean());
-  }
-
-  private void startThread(char type, boolean stopAfterOne, AtomicBoolean looped)
-    throws Exception {
     TxnDbUtil.setConfValues(conf);
-    CompactorThread t = null;
+    CompactorThread t;
     switch (type) {
       case 'i': t = new Initiator(); break;
       case 'w': t = new Worker(); break;
@@ -314,7 +301,7 @@ public abstract class CompactorTest {
     t.setThreadId((int) t.getId());
     t.setConf(conf);
     stop.set(stopAfterOne);
-    t.init(stop, looped);
+    t.init(stop);
     if (stopAfterOne) t.run();
     else t.start();
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 443f982..73eb341 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import it.unimi.dsi.fastutil.booleans.AbstractBooleanBidirectionalIterator;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,6 +60,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Tests for the worker thread and its MR jobs.
@@ -1067,8 +1073,90 @@ public class TestWorker extends CompactorTest {
     Assert.assertEquals(TxnState.ABORTED, openTxns.get(0).getState());
   }
 
+  // With high timeout, but fast run we should finish without a problem
+  @Test(timeout=1000)
+  public void testNormalRun() throws Exception {
+    runTimeoutTest(10000, false, true);
+  }
+
+  // With low timeout, but slow run we should finish without a problem
+  @Test(timeout=1000)
+  public void testTimeoutWithInterrupt() throws Exception {
+    runTimeoutTest(1, true, false);
+  }
+
+  // With low timeout, but slow run we should finish without a problem, even if the interrupt is swallowed
+  @Test(timeout=1000)
+  public void testTimeoutWithoutInterrupt() throws Exception {
+    runTimeoutTest(1, true, true);
+  }
+
+  private void runTimeoutTest(long timeout, boolean runForever, boolean swallowInterrupt) throws Exception {
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    HiveConf timeoutConf = new HiveConf(conf);
+    TimeoutWorker timeoutWorker;
+
+    timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, timeout, TimeUnit.MILLISECONDS);
+    timeoutWorker = getTimeoutWorker(timeoutConf, executor, runForever, swallowInterrupt);
+
+    // Wait until at least 1st loop is finished
+    while (!timeoutWorker.looped.get()) {
+      Thread.sleep(10L);
+    }
+
+    timeoutWorker.looped.set(false);
+
+    // Wait until the 2nd loop is finished
+    while (!timeoutWorker.looped.get()) {
+      Thread.sleep(10L);
+    }
+
+    timeoutWorker.stop.set(true);
+    executor.shutdownNow();
+  }
+
+  private TimeoutWorker getTimeoutWorker(HiveConf conf, ExecutorService executor, boolean runForever,
+      boolean swallowInterrupt) throws Exception {
+    TimeoutWorker timeoutWorker = new TimeoutWorker(runForever, swallowInterrupt);
+    timeoutWorker.setThreadId((int)timeoutWorker.getId());
+    timeoutWorker.setConf(conf);
+    timeoutWorker.init(new AtomicBoolean(false));
+    executor.submit(() -> timeoutWorker.run());
+    return timeoutWorker;
+  }
+
   @After
   public void tearDown() throws Exception {
     compactorTestCleanup();
   }
+
+  private static final class TimeoutWorker extends Worker {
+    private boolean runForever;
+    private boolean swallowInterrupt;
+    private AtomicBoolean looped;
+
+    private TimeoutWorker(boolean runForever, boolean swallowInterrupt) {
+      this.runForever = runForever;
+      this.swallowInterrupt = swallowInterrupt;
+      this.looped = new AtomicBoolean(false);
+    }
+
+    protected Boolean findNextCompactionAndExecute(boolean computeStats) throws InterruptedException {
+      looped.set(true);
+      if (runForever) {
+        while (!stop.get()) {
+          try {
+            looped.set(true);
+            Thread.sleep(Long.MAX_VALUE);
+          } catch (InterruptedException ie) {
+            if (!swallowInterrupt) {
+              throw ie;
+            }
+            Thread.sleep(Long.MAX_VALUE);
+          }
+        }
+      }
+      return true;
+    }
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index e20fdaf..6d68035 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -10548,7 +10548,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     LOG.info("Starting metastore thread of type " + thread.getClass().getName());
     thread.setConf(conf);
     thread.setThreadId(nextThreadId++);
-    thread.init(new AtomicBoolean(), new AtomicBoolean());
+    thread.init(new AtomicBoolean());
     thread.start();
   }
 
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
index ea61552..57bf36b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
@@ -39,18 +39,13 @@ public interface MetaStoreThread extends Configurable {
    * have been called.
    * @param stop a flag to watch for when to stop.  If this value is set to true,
    *             the thread will terminate the next time through its main loop.
-   * @param looped a flag that is set to true everytime a thread goes through it's main loop.
-   *               This is purely for testing so that tests can assure themselves that the thread
-   *               has run through it's loop once.  The test can set this value to false.  The
-   *               thread should then assure that the loop has been gone completely through at
-   *               least once.
    */
   // TODO: move these test parameters to more specific places... there's no need to have them here
-  void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception;
+  void init(AtomicBoolean stop) throws Exception;
 
   /**
    * Run the thread in the background.  This must not be called until
-   * {@link MetaStoreThread#init(java.util.concurrent.atomic.AtomicBoolean,java.util.concurrent.atomic.AtomicBoolean)} has
+   * {@link MetaStoreThread#init(java.util.concurrent.atomic.AtomicBoolean)} has
    * been called.
    */
   void start();
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 6101caa..3a3b267 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -786,8 +786,7 @@ public class TestStreaming {
     Worker t = new Worker();
     t.setThreadId((int) t.getId());
     t.setConf(hiveConf);
-    AtomicBoolean looped = new AtomicBoolean();
-    t.init(stop, looped);
+    t.init(stop);
     t.run();
   }