You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/06/29 11:17:10 UTC

[hive] branch master updated: HIVE-26242: Compaction heartbeater improvements (Laszlo Vegh, reviewed by Karen Coppage, Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new bc35507757 HIVE-26242: Compaction heartbeater improvements (Laszlo Vegh, reviewed by Karen Coppage, Denys Kuzmenko)
bc35507757 is described below

commit bc35507757c23a6da612a2dc4b840105aed2515c
Author: veghlaci05 <90...@users.noreply.github.com>
AuthorDate: Wed Jun 29 13:16:59 2022 +0200

    HIVE-26242: Compaction heartbeater improvements (Laszlo Vegh, reviewed by Karen Coppage, Denys Kuzmenko)
    
    Closes #3303
---
 .../txn/compactor/CompactionHeartbeatService.java  | 217 +++++++++++++++++++++
 .../ql/txn/compactor/IMetaStoreClientFactory.java  |  64 ++++++
 .../hadoop/hive/ql/txn/compactor/Worker.java       |  84 ++------
 .../compactor/TestCompactionHeartbeatService.java  | 154 +++++++++++++++
 4 files changed, 446 insertions(+), 73 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java
new file mode 100644
index 0000000000..788955e35c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hive.common.util.HiveStringUtils.SHUTDOWN_HOOK_PRIORITY;
+
+/**
+ * Singleton service responsible for heartbeating the compaction transactions.
+ */
+class CompactionHeartbeatService {
+
+
+  private static final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeatService.class);
+
+  private static volatile CompactionHeartbeatService instance;
+
+  /**
+   * Return the singleton instance of this class.
+   * @param conf The {@link HiveConf} used to create the service. Used only during the firsst call
+   * @return Returns the singleton {@link CompactionHeartbeatService}
+   * @throws IllegalStateException Thrown when the service has already been destroyed.
+   */
+  static CompactionHeartbeatService getInstance(HiveConf conf) {
+    if (instance == null) {
+      synchronized (CompactionHeartbeatService.class) {
+        if (instance == null) {
+          LOG.debug("Initializing compaction txn heartbeater service.");
+          instance = new CompactionHeartbeatService(conf);
+          ShutdownHookManager.addShutdownHook(() -> instance.shutdown(), SHUTDOWN_HOOK_PRIORITY);
+        }
+      }
+    }
+    if (instance.shuttingDown) {
+      throw new IllegalStateException("CompactionHeartbeatService is already destroyed!");
+    }
+    return instance;
+  }
+
+  private final ObjectPool<IMetaStoreClient> clientPool;
+  private volatile boolean shuttingDown = false;
+  private final long initialDelay;
+  private final long period;
+  private final ConcurrentHashMap<Long, CompactionHeartbeater> tasks = new ConcurrentHashMap<>(30);
+
+  /**
+   * Starts the heartbeat for the given transaction
+   * @param txnId The id of the compaction txn
+   * @param lockId The id of the lock associated with the txn
+   * @param tableName Required for logging only
+   * @throws IllegalStateException Thrown when the heartbeat for the given txn has already been started.
+   */
+  void startHeartbeat(long txnId, long lockId, String tableName) {
+    if (shuttingDown) {
+      throw new IllegalStateException("Service is shutting down, starting new heartbeats is not possible!");
+    }
+    if (tasks.containsKey(txnId)) {
+      throw new IllegalStateException("Heartbeat was already started for TXN " + txnId);
+    }
+    LOG.info("Submitting heartbeat task for TXN {}", txnId);
+    CompactionHeartbeater heartbeater = new CompactionHeartbeater(txnId, lockId, tableName);
+    heartbeater.start();
+    tasks.put(txnId, heartbeater);
+  }
+
+  /**
+   * Stops the heartbeat for the given transaction
+   * @param txnId The id of the compaction txn
+   * @throws IllegalStateException Thrown when there is no {@link CompactionHeartbeater} task associated with the
+   * given txnId.
+   */
+  void stopHeartbeat(long txnId) throws InterruptedException {
+    LOG.info("Stopping heartbeat task for TXN {}", txnId);
+    CompactionHeartbeater heartbeater = tasks.get(txnId);
+    if (heartbeater == null) {
+      throw new IllegalStateException("No registered heartbeat found for TXN " + txnId);
+    }
+    try {
+      heartbeater.stop();
+    } finally {
+      tasks.remove(txnId);
+    }
+  }
+
+  /**
+   * Shuts down the service, by closing its underlying resources. Be aware that after shutdown this service is no
+   * longer usable, there is no way to re-initialize it.
+   */
+  void shutdown() {
+    shuttingDown = true;
+    LOG.info("Shutting down compaction txn heartbeater service.");
+    for (CompactionHeartbeater heartbeater : tasks.values()) {
+      try {
+        heartbeater.stop();
+      } catch (InterruptedException e) {
+        LOG.warn("Shutdownhook thread was interrupted during shutting down the CompactionHeartbeatService.");
+      }
+    }
+    tasks.clear();
+    clientPool.close();
+    LOG.info("Compaction txn heartbeater service is successfully stopped.");
+  }
+
+  private CompactionHeartbeatService(HiveConf conf) {
+    int numberOfWorkers = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
+    GenericObjectPoolConfig<IMetaStoreClient> config = new GenericObjectPoolConfig<>();
+    config.setMinIdle(1);
+    config.setMaxIdle(2);
+    config.setMaxTotal(numberOfWorkers);
+    config.setMaxWaitMillis(2000);
+    clientPool = new GenericObjectPool<>(new IMetaStoreClientFactory(conf), config);
+    long txnTimeout = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+    initialDelay = txnTimeout / 4;
+    period = txnTimeout / 2;
+  }
+
+  private final class CompactionHeartbeater {
+    private final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class);
+    private final long txnId;
+    private final long lockId;
+    private final String tableName;
+    private final ScheduledThreadPoolExecutor heartbeatExecutor;
+
+    private CompactionHeartbeater(long txnId, long lockId, String tableName) {
+      heartbeatExecutor = new ScheduledThreadPoolExecutor(1);
+      heartbeatExecutor.setThreadFactory(new ThreadFactoryBuilder()
+          .setPriority(Thread.MIN_PRIORITY)
+          .setDaemon(true)
+          .setNameFormat("CompactionTxnHeartbeater-" + txnId)
+          .build());
+      this.tableName = Objects.requireNonNull(tableName);
+      this.txnId = txnId;
+      this.lockId = lockId;
+    }
+
+    void start() {
+      heartbeatExecutor.scheduleAtFixedRate(() -> {
+        IMetaStoreClient msc = null;
+        try {
+          LOG.debug("Heartbeating compaction transaction id {} for table: {}", txnId, tableName);
+          // Create a metastore client for each thread since it is not thread safe
+          msc = clientPool.borrowObject();
+          msc.heartbeat(txnId, lockId);
+        } catch (NoSuchElementException e) {
+          LOG.error("Compaction transaction heartbeater pool exhausted, unable to heartbeat", e);
+          // This heartbeat attempt failed, and there is no client to return to the pool.
+          return;
+        } catch (TException e) {
+          LOG.error("Error while heartbeating compaction transaction id {} for table: {}", txnId, tableName, e);
+          // Heartbeat failed, but the client is not broken, we can return it to the pool.
+        } catch (Exception e) {
+          LOG.error("Error while heartbeating compaction transaction id {} for table: {}", txnId, tableName, e);
+          // Unknown error, invalidate the client, maybe it is broken.
+          if (msc != null) {
+            try {
+              clientPool.invalidateObject(msc);
+            } catch (Exception ex) {
+              LOG.error("Error while invalidating a broken MetaStoreClient instance", e);
+            }
+          }
+          return;
+        }
+        try {
+          if (msc != null) {
+            clientPool.returnObject(msc);
+          }
+        } catch (Exception e) {
+          LOG.error("Error while returning back to the pool a MetaStoreClient instance", e);
+        }
+      }, initialDelay, period, TimeUnit.MILLISECONDS);
+    }
+
+    public void stop() throws InterruptedException {
+      LOG.info("Shutting down compaction txn heartbeater instance.");
+      heartbeatExecutor.shutdownNow();
+      if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", txnId);
+        return;
+      }
+      LOG.info("Compaction txn heartbeater instance is successfully stopped.");
+    }
+
+  }
+
+}
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/IMetaStoreClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/IMetaStoreClientFactory.java
new file mode 100644
index 0000000000..1fd74ca029
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/IMetaStoreClientFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+
+import java.util.Objects;
+
+/**
+ * Factory class responsible for managing (creating/wrapping/destroying) the {@link IMetaStoreClient} instances.
+ * Used by the {@link org.apache.commons.pool2.ObjectPool} in {@link CompactionHeartbeatService} to allow pooling
+ * of {@link IMetaStoreClient}s.
+ */
+final class IMetaStoreClientFactory extends BasePooledObjectFactory<IMetaStoreClient> {
+
+  private final HiveConf conf;
+
+  @Override
+  public IMetaStoreClient create() throws Exception {
+    return HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+  }
+
+  @Override
+  public PooledObject<IMetaStoreClient> wrap(IMetaStoreClient msc) {
+    return new DefaultPooledObject<>(msc);
+  }
+
+  @Override
+  public void destroyObject(PooledObject<IMetaStoreClient> msc) {
+    msc.getObject().close();
+  }
+
+  @Override
+  public boolean validateObject(PooledObject<IMetaStoreClient> msc) {
+    //Not in use currently, would be good to validate the client at borrowing/returning, but this needs support from
+    //MetaStoreClient side
+    return super.validateObject(msc);
+  }
+
+  public IMetaStoreClientFactory(HiveConf conf) {
+    this.conf = Objects.requireNonNull(conf);
+  }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 69bff17d0a..707ebff393 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -18,9 +18,7 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.ScheduledExecutorService;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,7 +26,6 @@ import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreThread;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -69,7 +66,6 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -205,36 +201,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     }
   }
 
-  static final class CompactionHeartbeater implements Runnable {
-    static final private Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class);
-    private final CompactionTxn compactionTxn;
-    private final String tableName;
-    private final HiveConf conf;
-
-    public CompactionHeartbeater(CompactionTxn compactionTxn, String tableName, HiveConf conf) {
-      this.tableName = Objects.requireNonNull(tableName);
-      this.compactionTxn = Objects.requireNonNull(compactionTxn);
-      this.conf = Objects.requireNonNull(conf);
-    }
-
-    @Override
-    public void run() {
-      LOG.debug("Heartbeating compaction transaction id {} for table: {}", compactionTxn, tableName);
-      IMetaStoreClient msc = null;
-      try {
-        // Create a metastore client for each thread since it is not thread safe
-        msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
-        msc.heartbeat(compactionTxn.getTxnId(), compactionTxn.getLockId());
-      } catch (Exception e) {
-          LOG.error("Error while heartbeating transaction id {} for table: {}", compactionTxn, tableName, e);
-      } finally {
-        if (msc != null) {
-          msc.close();
-        }
-      }
-    }
-  }
-
   /**
    * Determine if compaction can run in a specified directory.
    * @param isMajorCompaction type of compaction.
@@ -669,13 +635,12 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
   /**
    * Keep track of the compaction's transaction and its operations.
    */
-  private class CompactionTxn implements AutoCloseable {
+  class CompactionTxn implements AutoCloseable {
     private long txnId = 0;
     private long lockId = 0;
 
     private TxnStatus status = TxnStatus.UNKNOWN;
     private boolean succeessfulCompaction = false;
-    private ScheduledExecutorService heartbeatExecutor;
 
     /**
      * Try to open a new txn.
@@ -692,18 +657,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
             + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
-                .setPriority(Thread.MIN_PRIORITY)
-                .setDaemon(true)
-                .setNameFormat("CompactionTxnHeartbeater-" + txnId)
-                .build());
-      long txnTimeout = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
-      heartbeatExecutor.scheduleAtFixedRate(
-              new CompactionHeartbeater(this, TxnUtils.getFullTableName(ci.dbname, ci.tableName), conf),
-              txnTimeout / 4,
-              txnTimeout / 2,
-              TimeUnit.MILLISECONDS
-      );
+      CompactionHeartbeatService.getInstance(conf).startHeartbeat(txnId, lockId, TxnUtils.getFullTableName(ci.dbname, ci.tableName));
     }
 
     /**
@@ -718,28 +672,16 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
      * @throws Exception
      */
     @Override public void close() throws Exception {
-      //the transaction is about to close, we can stop heartbeating regardless of it's state
-      shutdownHeartbeater();
-      if (status != TxnStatus.UNKNOWN) {
-        if (succeessfulCompaction) {
-          commit();
-        } else {
-          abort();
-        }
-      }
-    }
-
-    private void shutdownHeartbeater() {
-      if (heartbeatExecutor != null) {
-        heartbeatExecutor.shutdownNow();
-        try {
-          if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-            LOG.warn("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", this);
+      try {
+        //the transaction is about to close, we can stop heartbeating regardless of it's state
+        CompactionHeartbeatService.getInstance(conf).stopHeartbeat(txnId);
+      } finally {
+        if (status != TxnStatus.UNKNOWN) {
+          if (succeessfulCompaction) {
+            commit();
+          } else {
+            abort();
           }
-        } catch (InterruptedException ex) {
-          //Caller thread was interrupted while waiting for heartbeater to terminate.
-          //Nothing to do, just restore the interrupted state.
-          Thread.currentThread().interrupt();
         }
       }
     }
@@ -748,10 +690,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
       return txnId;
     }
 
-    long getLockId() {
-      return lockId;
-    }
-
     @Override public String toString() {
       return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " + status + ")";
     }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionHeartbeatService.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionHeartbeatService.java
new file mode 100644
index 0000000000..99455783da
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionHeartbeatService.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({HiveMetaStoreUtils.class})
+@PowerMockIgnore("javax.management.*")
+public class TestCompactionHeartbeatService {
+
+  private static Field HEARTBEAT_SINGLETON;
+  @Mock
+  private HiveConf conf;
+  @Mock
+  private IMetaStoreClient client;
+
+  @BeforeClass
+  public static void setupClass() throws NoSuchFieldException {
+    HEARTBEAT_SINGLETON = CompactionHeartbeatService.class.getDeclaredField("instance");
+    HEARTBEAT_SINGLETON.setAccessible(true);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    Mockito.when(conf.get(MetastoreConf.ConfVars.TXN_TIMEOUT.getVarname())).thenReturn("100ms");
+    Mockito.when(conf.get(MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS.getVarname())).thenReturn("4");
+    PowerMockito.mockStatic(HiveMetaStoreUtils.class);
+    PowerMockito.when(HiveMetaStoreUtils.getHiveMetastoreClient(any())).thenReturn(client);
+    HEARTBEAT_SINGLETON.set(null,null);
+  }
+
+  @After
+  public void tearDown() throws InterruptedException {
+    CompactionHeartbeatService.getInstance(conf).shutdown();
+  }
+
+  @Test
+  public void testHeartbeat() throws Exception {
+    CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+    Thread.sleep(300);
+    CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0);
+    verify(client, atLeast(1)).heartbeat(0,0);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testStopHeartbeatForNonExistentTxn() throws InterruptedException {
+    CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0);
+  }
+
+  @Test
+  public void testNoHeartbeatAfterStop() throws Exception {
+    AtomicBoolean stopped = new AtomicBoolean(false);
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      if (stopped.get()) {
+        Assert.fail("Heartbeat after stopHeartbeat call");
+      }
+      return null;
+    }).when(client).heartbeat(0,0);
+    CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+    Thread.sleep(200);
+    CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0);
+    stopped.set(true);
+    verify(client, atLeast(1)).heartbeat(0,0);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testStartHeartbeatTwice() {
+    CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+    CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+  }
+
+  @Test
+  public void testStopHeartbeatAbortedTheThread() throws Exception {
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    AtomicBoolean heartbeated = new AtomicBoolean(false);
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      //make sure we call stopHeartbeat when we are in the middle of the hearbeat call
+      countDownLatch.countDown();
+      Thread.sleep(500);
+      heartbeated.set(true);
+      return null;
+    }).when(client).heartbeat(0,0);
+    CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+    //We try to stop heartbeating while it's in the middle of a heartbeat
+    countDownLatch.await();
+    CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0);
+    Assert.assertFalse(heartbeated.get());
+    // Check if heartbeat was done only once despite the timing is 100ms and the first took 500ms
+    verify(client, times(1)).heartbeat(0,0);
+  }
+
+  @Test
+  public void testBadClientInvalidated() throws Exception {
+    CountDownLatch countDownLatch = new CountDownLatch(3);
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      countDownLatch.countDown();
+      if (countDownLatch.getCount() == 0) {
+        Thread.sleep(100);
+      }
+      throw new RuntimeException();
+    }).when(client).heartbeat(0,0);
+    CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+    //We stop only after 3 heartbeats
+    countDownLatch.await();
+    CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0);
+    // Check if bad clients were closed and new ones were requested
+    verify(client, times(3)).heartbeat(0,0);
+    verify(client, times(3)).close();
+    PowerMockito.verifyStatic(HiveMetaStoreUtils.class, times(3));
+    HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+  }
+}