You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/06/29 11:17:10 UTC
[hive] branch master updated: HIVE-26242: Compaction heartbeater improvements (Laszlo Vegh, reviewed by Karen Coppage, Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new bc35507757 HIVE-26242: Compaction heartbeater improvements (Laszlo Vegh, reviewed by Karen Coppage, Denys Kuzmenko)
bc35507757 is described below
commit bc35507757c23a6da612a2dc4b840105aed2515c
Author: veghlaci05 <90...@users.noreply.github.com>
AuthorDate: Wed Jun 29 13:16:59 2022 +0200
HIVE-26242: Compaction heartbeater improvements (Laszlo Vegh, reviewed by Karen Coppage, Denys Kuzmenko)
Closes #3303
---
.../txn/compactor/CompactionHeartbeatService.java | 217 +++++++++++++++++++++
.../ql/txn/compactor/IMetaStoreClientFactory.java | 64 ++++++
.../hadoop/hive/ql/txn/compactor/Worker.java | 84 ++------
.../compactor/TestCompactionHeartbeatService.java | 154 +++++++++++++++
4 files changed, 446 insertions(+), 73 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java
new file mode 100644
index 0000000000..788955e35c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hive.common.util.HiveStringUtils.SHUTDOWN_HOOK_PRIORITY;
+
+/**
+ * Singleton service responsible for heartbeating the compaction transactions.
+ */
+class CompactionHeartbeatService {
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeatService.class);
+
+ private static volatile CompactionHeartbeatService instance;
+
+ /**
+ * Return the singleton instance of this class.
+ * @param conf The {@link HiveConf} used to create the service. Used only during the firsst call
+ * @return Returns the singleton {@link CompactionHeartbeatService}
+ * @throws IllegalStateException Thrown when the service has already been destroyed.
+ */
+ static CompactionHeartbeatService getInstance(HiveConf conf) {
+ if (instance == null) {
+ synchronized (CompactionHeartbeatService.class) {
+ if (instance == null) {
+ LOG.debug("Initializing compaction txn heartbeater service.");
+ instance = new CompactionHeartbeatService(conf);
+ ShutdownHookManager.addShutdownHook(() -> instance.shutdown(), SHUTDOWN_HOOK_PRIORITY);
+ }
+ }
+ }
+ if (instance.shuttingDown) {
+ throw new IllegalStateException("CompactionHeartbeatService is already destroyed!");
+ }
+ return instance;
+ }
+
+ private final ObjectPool<IMetaStoreClient> clientPool;
+ private volatile boolean shuttingDown = false;
+ private final long initialDelay;
+ private final long period;
+ private final ConcurrentHashMap<Long, CompactionHeartbeater> tasks = new ConcurrentHashMap<>(30);
+
+ /**
+ * Starts the heartbeat for the given transaction
+ * @param txnId The id of the compaction txn
+ * @param lockId The id of the lock associated with the txn
+ * @param tableName Required for logging only
+ * @throws IllegalStateException Thrown when the heartbeat for the given txn has already been started.
+ */
+ void startHeartbeat(long txnId, long lockId, String tableName) {
+ if (shuttingDown) {
+ throw new IllegalStateException("Service is shutting down, starting new heartbeats is not possible!");
+ }
+ if (tasks.containsKey(txnId)) {
+ throw new IllegalStateException("Heartbeat was already started for TXN " + txnId);
+ }
+ LOG.info("Submitting heartbeat task for TXN {}", txnId);
+ CompactionHeartbeater heartbeater = new CompactionHeartbeater(txnId, lockId, tableName);
+ heartbeater.start();
+ tasks.put(txnId, heartbeater);
+ }
+
+ /**
+ * Stops the heartbeat for the given transaction
+ * @param txnId The id of the compaction txn
+ * @throws IllegalStateException Thrown when there is no {@link CompactionHeartbeater} task associated with the
+ * given txnId.
+ */
+ void stopHeartbeat(long txnId) throws InterruptedException {
+ LOG.info("Stopping heartbeat task for TXN {}", txnId);
+ CompactionHeartbeater heartbeater = tasks.get(txnId);
+ if (heartbeater == null) {
+ throw new IllegalStateException("No registered heartbeat found for TXN " + txnId);
+ }
+ try {
+ heartbeater.stop();
+ } finally {
+ tasks.remove(txnId);
+ }
+ }
+
+ /**
+ * Shuts down the service, by closing its underlying resources. Be aware that after shutdown this service is no
+ * longer usable, there is no way to re-initialize it.
+ */
+ void shutdown() {
+ shuttingDown = true;
+ LOG.info("Shutting down compaction txn heartbeater service.");
+ for (CompactionHeartbeater heartbeater : tasks.values()) {
+ try {
+ heartbeater.stop();
+ } catch (InterruptedException e) {
+ LOG.warn("Shutdownhook thread was interrupted during shutting down the CompactionHeartbeatService.");
+ }
+ }
+ tasks.clear();
+ clientPool.close();
+ LOG.info("Compaction txn heartbeater service is successfully stopped.");
+ }
+
+ private CompactionHeartbeatService(HiveConf conf) {
+ int numberOfWorkers = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
+ GenericObjectPoolConfig<IMetaStoreClient> config = new GenericObjectPoolConfig<>();
+ config.setMinIdle(1);
+ config.setMaxIdle(2);
+ config.setMaxTotal(numberOfWorkers);
+ config.setMaxWaitMillis(2000);
+ clientPool = new GenericObjectPool<>(new IMetaStoreClientFactory(conf), config);
+ long txnTimeout = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+ initialDelay = txnTimeout / 4;
+ period = txnTimeout / 2;
+ }
+
+ private final class CompactionHeartbeater {
+ private final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class);
+ private final long txnId;
+ private final long lockId;
+ private final String tableName;
+ private final ScheduledThreadPoolExecutor heartbeatExecutor;
+
+ private CompactionHeartbeater(long txnId, long lockId, String tableName) {
+ heartbeatExecutor = new ScheduledThreadPoolExecutor(1);
+ heartbeatExecutor.setThreadFactory(new ThreadFactoryBuilder()
+ .setPriority(Thread.MIN_PRIORITY)
+ .setDaemon(true)
+ .setNameFormat("CompactionTxnHeartbeater-" + txnId)
+ .build());
+ this.tableName = Objects.requireNonNull(tableName);
+ this.txnId = txnId;
+ this.lockId = lockId;
+ }
+
+ void start() {
+ heartbeatExecutor.scheduleAtFixedRate(() -> {
+ IMetaStoreClient msc = null;
+ try {
+ LOG.debug("Heartbeating compaction transaction id {} for table: {}", txnId, tableName);
+ // Create a metastore client for each thread since it is not thread safe
+ msc = clientPool.borrowObject();
+ msc.heartbeat(txnId, lockId);
+ } catch (NoSuchElementException e) {
+ LOG.error("Compaction transaction heartbeater pool exhausted, unable to heartbeat", e);
+ // This heartbeat attempt failed, and there is no client to return to the pool.
+ return;
+ } catch (TException e) {
+ LOG.error("Error while heartbeating compaction transaction id {} for table: {}", txnId, tableName, e);
+ // Heartbeat failed, but the client is not broken, we can return it to the pool.
+ } catch (Exception e) {
+ LOG.error("Error while heartbeating compaction transaction id {} for table: {}", txnId, tableName, e);
+ // Unknown error, invalidate the client, maybe it is broken.
+ if (msc != null) {
+ try {
+ clientPool.invalidateObject(msc);
+ } catch (Exception ex) {
+ LOG.error("Error while invalidating a broken MetaStoreClient instance", e);
+ }
+ }
+ return;
+ }
+ try {
+ if (msc != null) {
+ clientPool.returnObject(msc);
+ }
+ } catch (Exception e) {
+ LOG.error("Error while returning back to the pool a MetaStoreClient instance", e);
+ }
+ }, initialDelay, period, TimeUnit.MILLISECONDS);
+ }
+
+ public void stop() throws InterruptedException {
+ LOG.info("Shutting down compaction txn heartbeater instance.");
+ heartbeatExecutor.shutdownNow();
+ if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOG.warn("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", txnId);
+ return;
+ }
+ LOG.info("Compaction txn heartbeater instance is successfully stopped.");
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/IMetaStoreClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/IMetaStoreClientFactory.java
new file mode 100644
index 0000000000..1fd74ca029
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/IMetaStoreClientFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+
+import java.util.Objects;
+
+/**
+ * Factory class responsible for managing (creating/wrapping/destroying) the {@link IMetaStoreClient} instances.
+ * Used by the {@link org.apache.commons.pool2.ObjectPool} in {@link CompactionHeartbeatService} to allow pooling
+ * of {@link IMetaStoreClient}s.
+ */
+final class IMetaStoreClientFactory extends BasePooledObjectFactory<IMetaStoreClient> {
+
+ private final HiveConf conf;
+
+ @Override
+ public IMetaStoreClient create() throws Exception {
+ return HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+ }
+
+ @Override
+ public PooledObject<IMetaStoreClient> wrap(IMetaStoreClient msc) {
+ return new DefaultPooledObject<>(msc);
+ }
+
+ @Override
+ public void destroyObject(PooledObject<IMetaStoreClient> msc) {
+ msc.getObject().close();
+ }
+
+ @Override
+ public boolean validateObject(PooledObject<IMetaStoreClient> msc) {
+ //Not in use currently, would be good to validate the client at borrowing/returning, but this needs support from
+ //MetaStoreClient side
+ return super.validateObject(msc);
+ }
+
+ public IMetaStoreClientFactory(HiveConf conf) {
+ this.conf = Objects.requireNonNull(conf);
+ }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 69bff17d0a..707ebff393 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.ScheduledExecutorService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,7 +26,6 @@ import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -69,7 +66,6 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -205,36 +201,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
}
}
- static final class CompactionHeartbeater implements Runnable {
- static final private Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class);
- private final CompactionTxn compactionTxn;
- private final String tableName;
- private final HiveConf conf;
-
- public CompactionHeartbeater(CompactionTxn compactionTxn, String tableName, HiveConf conf) {
- this.tableName = Objects.requireNonNull(tableName);
- this.compactionTxn = Objects.requireNonNull(compactionTxn);
- this.conf = Objects.requireNonNull(conf);
- }
-
- @Override
- public void run() {
- LOG.debug("Heartbeating compaction transaction id {} for table: {}", compactionTxn, tableName);
- IMetaStoreClient msc = null;
- try {
- // Create a metastore client for each thread since it is not thread safe
- msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
- msc.heartbeat(compactionTxn.getTxnId(), compactionTxn.getLockId());
- } catch (Exception e) {
- LOG.error("Error while heartbeating transaction id {} for table: {}", compactionTxn, tableName, e);
- } finally {
- if (msc != null) {
- msc.close();
- }
- }
- }
- }
-
/**
* Determine if compaction can run in a specified directory.
* @param isMajorCompaction type of compaction.
@@ -669,13 +635,12 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
/**
* Keep track of the compaction's transaction and its operations.
*/
- private class CompactionTxn implements AutoCloseable {
+ class CompactionTxn implements AutoCloseable {
private long txnId = 0;
private long lockId = 0;
private TxnStatus status = TxnStatus.UNKNOWN;
private boolean succeessfulCompaction = false;
- private ScheduledExecutorService heartbeatExecutor;
/**
* Try to open a new txn.
@@ -692,18 +657,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
+ "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}");
}
lockId = res.getLockid();
- heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
- .setPriority(Thread.MIN_PRIORITY)
- .setDaemon(true)
- .setNameFormat("CompactionTxnHeartbeater-" + txnId)
- .build());
- long txnTimeout = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
- heartbeatExecutor.scheduleAtFixedRate(
- new CompactionHeartbeater(this, TxnUtils.getFullTableName(ci.dbname, ci.tableName), conf),
- txnTimeout / 4,
- txnTimeout / 2,
- TimeUnit.MILLISECONDS
- );
+ CompactionHeartbeatService.getInstance(conf).startHeartbeat(txnId, lockId, TxnUtils.getFullTableName(ci.dbname, ci.tableName));
}
/**
@@ -718,28 +672,16 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
* @throws Exception
*/
@Override public void close() throws Exception {
- //the transaction is about to close, we can stop heartbeating regardless of it's state
- shutdownHeartbeater();
- if (status != TxnStatus.UNKNOWN) {
- if (succeessfulCompaction) {
- commit();
- } else {
- abort();
- }
- }
- }
-
- private void shutdownHeartbeater() {
- if (heartbeatExecutor != null) {
- heartbeatExecutor.shutdownNow();
- try {
- if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
- LOG.warn("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", this);
+ try {
+ //the transaction is about to close, we can stop heartbeating regardless of it's state
+ CompactionHeartbeatService.getInstance(conf).stopHeartbeat(txnId);
+ } finally {
+ if (status != TxnStatus.UNKNOWN) {
+ if (succeessfulCompaction) {
+ commit();
+ } else {
+ abort();
}
- } catch (InterruptedException ex) {
- //Caller thread was interrupted while waiting for heartbeater to terminate.
- //Nothing to do, just restore the interrupted state.
- Thread.currentThread().interrupt();
}
}
}
@@ -748,10 +690,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
return txnId;
}
- long getLockId() {
- return lockId;
- }
-
@Override public String toString() {
return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " + status + ")";
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionHeartbeatService.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionHeartbeatService.java
new file mode 100644
index 0000000000..99455783da
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionHeartbeatService.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({HiveMetaStoreUtils.class})
+@PowerMockIgnore("javax.management.*")
+public class TestCompactionHeartbeatService {
+
+ private static Field HEARTBEAT_SINGLETON;
+ @Mock
+ private HiveConf conf;
+ @Mock
+ private IMetaStoreClient client;
+
+ @BeforeClass
+ public static void setupClass() throws NoSuchFieldException {
+ HEARTBEAT_SINGLETON = CompactionHeartbeatService.class.getDeclaredField("instance");
+ HEARTBEAT_SINGLETON.setAccessible(true);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ Mockito.when(conf.get(MetastoreConf.ConfVars.TXN_TIMEOUT.getVarname())).thenReturn("100ms");
+ Mockito.when(conf.get(MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS.getVarname())).thenReturn("4");
+ PowerMockito.mockStatic(HiveMetaStoreUtils.class);
+ PowerMockito.when(HiveMetaStoreUtils.getHiveMetastoreClient(any())).thenReturn(client);
+ HEARTBEAT_SINGLETON.set(null,null);
+ }
+
+ @After
+ public void tearDown() throws InterruptedException {
+ CompactionHeartbeatService.getInstance(conf).shutdown();
+ }
+
+ @Test
+ public void testHeartbeat() throws Exception {
+ CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+ Thread.sleep(300);
+ CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0);
+ verify(client, atLeast(1)).heartbeat(0,0);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testStopHeartbeatForNonExistentTxn() throws InterruptedException {
+ CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0);
+ }
+
+ @Test
+ public void testNoHeartbeatAfterStop() throws Exception {
+ AtomicBoolean stopped = new AtomicBoolean(false);
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ if (stopped.get()) {
+ Assert.fail("Heartbeat after stopHeartbeat call");
+ }
+ return null;
+ }).when(client).heartbeat(0,0);
+ CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+ Thread.sleep(200);
+ CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0);
+ stopped.set(true);
+ verify(client, atLeast(1)).heartbeat(0,0);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testStartHeartbeatTwice() {
+ CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+ CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+ }
+
+ @Test
+ public void testStopHeartbeatAbortedTheThread() throws Exception {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ AtomicBoolean heartbeated = new AtomicBoolean(false);
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ //make sure we call stopHeartbeat when we are in the middle of the hearbeat call
+ countDownLatch.countDown();
+ Thread.sleep(500);
+ heartbeated.set(true);
+ return null;
+ }).when(client).heartbeat(0,0);
+ CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+ //We try to stop heartbeating while it's in the middle of a heartbeat
+ countDownLatch.await();
+ CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0);
+ Assert.assertFalse(heartbeated.get());
+ // Check if heartbeat was done only once despite the timing is 100ms and the first took 500ms
+ verify(client, times(1)).heartbeat(0,0);
+ }
+
+ @Test
+ public void testBadClientInvalidated() throws Exception {
+ CountDownLatch countDownLatch = new CountDownLatch(3);
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ countDownLatch.countDown();
+ if (countDownLatch.getCount() == 0) {
+ Thread.sleep(100);
+ }
+ throw new RuntimeException();
+ }).when(client).heartbeat(0,0);
+ CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table");
+ //We stop only after 3 heartbeats
+ countDownLatch.await();
+ CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0);
+ // Check if bad clients were closed and new ones were requested
+ verify(client, times(3)).heartbeat(0,0);
+ verify(client, times(3)).close();
+ PowerMockito.verifyStatic(HiveMetaStoreUtils.class, times(3));
+ HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+ }
+}