You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2022/08/04 07:46:35 UTC
[iceberg] branch master updated: Hive: Fix concurrent transactions overwriting commits by adding hive lock heartbeats. (#5036)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 28b5d0ec81 Hive: Fix concurrent transactions overwriting commits by adding hive lock heartbeats. (#5036)
28b5d0ec81 is described below
commit 28b5d0ec81ea615d1d644c2d1c0d0667c4323639
Author: Ashish Singh <as...@pinterest.com>
AuthorDate: Thu Aug 4 00:46:29 2022 -0700
Hive: Fix concurrent transactions overwriting commits by adding hive lock heartbeats. (#5036)
---
.../exceptions/CommitStateUnknownException.java | 4 +
.../apache/iceberg/hive/HiveTableOperations.java | 90 +++++++++++++++++++++-
.../org/apache/iceberg/hive/HiveMetastoreTest.java | 19 ++++-
.../apache/iceberg/hive/TestHiveCommitLocks.java | 49 +++++++++++-
4 files changed, 155 insertions(+), 7 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java b/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java
index 2ecb97fe12..7bfe9e1853 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java
@@ -35,4 +35,8 @@ public class CommitStateUnknownException extends RuntimeException {
public CommitStateUnknownException(Throwable cause) {
super(cause.getMessage() + "\n" + COMMON_INFO, cause);
}
+
+ public CommitStateUnknownException(String message, Throwable cause) {
+ super(message + "\n" + cause.getMessage() + "\n" + COMMON_INFO, cause);
+ }
}
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 294605a551..1acbfab920 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -31,6 +31,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -74,6 +77,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.JsonUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.thrift.TException;
@@ -90,6 +94,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+ private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+ "iceberg.hive.lock-heartbeat-interval-ms";
private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES =
"iceberg.hive.metadata-refresh-max-retries";
private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
@@ -104,6 +110,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+ private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;
private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION =
@@ -153,10 +160,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
private final long lockAcquireTimeout;
private final long lockCheckMinWaitTime;
private final long lockCheckMaxWaitTime;
+ private final long lockHeartbeatIntervalTime;
private final long maxHiveTablePropertySize;
private final int metadataRefreshMaxRetries;
private final FileIO fileIO;
private final ClientPool<IMetaStoreClient, TException> metaClients;
+ private final ScheduledExecutorService exitingScheduledExecutorService;
protected HiveTableOperations(
Configuration conf,
@@ -177,6 +186,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
this.lockCheckMaxWaitTime =
conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+ this.lockHeartbeatIntervalTime =
+ conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
this.metadataRefreshMaxRetries =
conf.getInt(
HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES,
@@ -185,6 +196,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
long tableLevelLockCacheEvictionTimeout =
conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+ this.exitingScheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("iceberg-hive-lock-heartbeat-%d")
+ .build());
initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
}
@@ -243,9 +260,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
// JVM process, which would result in unnecessary and costly HMS lock acquisition requests
ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
tableLevelMutex.lock();
+ HiveLockHeartbeat hiveLockHeartbeat = null;
try {
lockId = Optional.of(acquireLock());
- // TODO add lock heart beating for cases where default lock timeout is too low.
+ hiveLockHeartbeat =
+ new HiveLockHeartbeat(metaClients, lockId.get(), lockHeartbeatIntervalTime);
+ hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
Table tbl = loadHmsTable();
@@ -296,7 +316,24 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
}
try {
+ if (hiveLockHeartbeat.future.isCancelled()
+ || hiveLockHeartbeat.encounteredException != null) {
+ throw new CommitFailedException(
+ "Failed to heartbeat for hive lock. %s",
+ hiveLockHeartbeat.encounteredException.getMessage());
+ }
+
persistTable(tbl, updateHiveTable);
+ if (hiveLockHeartbeat.future.isCancelled()
+ || hiveLockHeartbeat.encounteredException != null) {
+ throw new CommitStateUnknownException(
+ "Failed to heartbeat for hive lock while "
+ + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. "
+ + "Please check the commit history. If you are running into this issue, try reducing "
+ + "iceberg.hive.lock-heartbeat-interval-ms.",
+ hiveLockHeartbeat.encounteredException);
+ }
+
commitStatus = CommitStatus.SUCCESS;
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName);
@@ -304,6 +341,9 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
} catch (InvalidObjectException e) {
throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName);
+ } catch (CommitFailedException | CommitStateUnknownException e) {
+ throw e;
+
} catch (Throwable e) {
if (e.getMessage() != null
&& e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
@@ -338,6 +378,10 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
throw new RuntimeException("Interrupted during commit", e);
} finally {
+ if (hiveLockHeartbeat != null) {
+ hiveLockHeartbeat.cancel();
+ }
+
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex);
}
@@ -363,7 +407,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
}
}
- private Table loadHmsTable() throws TException, InterruptedException {
+ @VisibleForTesting
+ Table loadHmsTable() throws TException, InterruptedException {
try {
return metaClients.run(client -> client.getTable(database, tableName));
} catch (NoSuchObjectException nte) {
@@ -710,4 +755,45 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
return conf.getBoolean(
ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
}
+
+ private static class HiveLockHeartbeat implements Runnable {
+ private final ClientPool<IMetaStoreClient, TException> hmsClients;
+ private final long lockId;
+ private final long intervalMs;
+ private ScheduledFuture<?> future;
+ private volatile Exception encounteredException = null;
+
+ HiveLockHeartbeat(
+ ClientPool<IMetaStoreClient, TException> hmsClients, long lockId, long intervalMs) {
+ this.hmsClients = hmsClients;
+ this.lockId = lockId;
+ this.intervalMs = intervalMs;
+ this.future = null;
+ }
+
+ @Override
+ public void run() {
+ try {
+ hmsClients.run(
+ client -> {
+ client.heartbeat(0, lockId);
+ return null;
+ });
+ } catch (TException | InterruptedException e) {
+ this.encounteredException = e;
+ throw new CommitFailedException(e, "Failed to heartbeat for lock: %d", lockId);
+ }
+ }
+
+ public void schedule(ScheduledExecutorService scheduler) {
+ future =
+ scheduler.scheduleAtFixedRate(this, intervalMs / 2, intervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ public void cancel() {
+ if (future != null) {
+ future.cancel(false);
+ }
+ }
+ }
}
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java
index b3c31351c3..8c7adbc1f6 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.hive;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -41,10 +43,21 @@ public abstract class HiveMetastoreTest {
@BeforeClass
public static void startMetastore() throws Exception {
+ startMetastore(Collections.emptyMap());
+ }
+
+ public static void startMetastore(Map<String, String> hiveConfOverride) throws Exception {
HiveMetastoreTest.metastore = new TestHiveMetastore();
- metastore.start();
+ HiveConf hiveConfWithOverrides = new HiveConf(TestHiveMetastore.class);
+ if (hiveConfOverride != null) {
+ for (Map.Entry<String, String> kv : hiveConfOverride.entrySet()) {
+ hiveConfWithOverrides.set(kv.getKey(), kv.getValue());
+ }
+ }
+
+ metastore.start(hiveConfWithOverrides);
HiveMetastoreTest.hiveConf = metastore.hiveConf();
- HiveMetastoreTest.metastoreClient = new HiveMetaStoreClient(hiveConf);
+ HiveMetastoreTest.metastoreClient = new HiveMetaStoreClient(hiveConfWithOverrides);
String dbPath = metastore.getDatabasePath(DB_NAME);
Database db = new Database(DB_NAME, "description", dbPath, Maps.newHashMap());
metastoreClient.createDatabase(db);
@@ -56,7 +69,7 @@ public abstract class HiveMetastoreTest {
ImmutableMap.of(
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
String.valueOf(EVICTION_INTERVAL)),
- hiveConf);
+ hiveConfWithOverrides);
}
@AfterClass
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 37fa9b519c..8b98eb897a 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -19,8 +19,11 @@
package org.apache.iceberg.hive;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -36,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.LockRequest;
@@ -46,6 +50,7 @@ import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.apache.thrift.TException;
import org.junit.AfterClass;
@@ -53,12 +58,14 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.AdditionalAnswers;
+import org.mockito.invocation.InvocationOnMock;
public class TestHiveCommitLocks extends HiveTableBaseTest {
private static HiveTableOperations spyOps = null;
private static HiveClientPool spyClientPool = null;
private static CachedClientPool spyCachedClientPool = null;
- private static Configuration overriddenHiveConf = new Configuration(hiveConf);
+ private static Configuration overriddenHiveConf;
private static AtomicReference<IMetaStoreClient> spyClientRef = new AtomicReference<>();
private static IMetaStoreClient spyClient = null;
HiveTableOperations ops = null;
@@ -71,10 +78,16 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
LockResponse notAcquiredLockResponse = new LockResponse(dummyLockId, LockState.NOT_ACQUIRED);
@BeforeClass
- public static void initializeSpies() throws Exception {
+ public static void startMetastore() throws Exception {
+ HiveMetastoreTest.startMetastore(
+ ImmutableMap.of(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname, "1s"));
+
+ // start spies
+ overriddenHiveConf = new Configuration(hiveConf);
overriddenHiveConf.setLong("iceberg.hive.lock-timeout-ms", 6 * 1000);
overriddenHiveConf.setLong("iceberg.hive.lock-check-min-wait-ms", 50);
overriddenHiveConf.setLong("iceberg.hive.lock-check-max-wait-ms", 5 * 1000);
+ overriddenHiveConf.setLong("iceberg.hive.lock-heartbeat-interval-ms", 100);
// Set up the spy clients as static variables instead of before every test.
// The spy clients are reused between methods and closed at the end of all tests in this class.
@@ -139,6 +152,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
public void testLockAcquisitionAtFirstTime() throws TException, InterruptedException {
doReturn(acquiredLockResponse).when(spyClient).lock(any());
doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+ doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
spyOps.doCommit(metadataV2, metadataV1);
@@ -157,6 +171,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
.when(spyClient)
.checkLock(eq(dummyLockId));
doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+ doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
spyOps.doCommit(metadataV2, metadataV1);
@@ -269,4 +284,34 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
// all threads eventually got their turn
verify(spyClient, times(numConcurrentCommits)).lock(any(LockRequest.class));
}
+
+ @Test
+ public void testLockHeartbeat() throws TException {
+ doReturn(acquiredLockResponse).when(spyClient).lock(any());
+ doAnswer(AdditionalAnswers.answersWithDelay(2000, InvocationOnMock::callRealMethod))
+ .when(spyClient)
+ .getTable(any(), any());
+ doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
+
+ spyOps.doCommit(metadataV2, metadataV1);
+
+ verify(spyClient, atLeastOnce()).heartbeat(eq(0L), eq(dummyLockId));
+ }
+
+ @Test
+ public void testLockHeartbeatFailureDuringCommit() throws TException, InterruptedException {
+ doReturn(acquiredLockResponse).when(spyClient).lock(any());
+ doAnswer(AdditionalAnswers.answersWithDelay(2000, InvocationOnMock::callRealMethod))
+ .when(spyOps)
+ .loadHmsTable();
+ doThrow(new TException("Failed to heart beat."))
+ .when(spyClient)
+ .heartbeat(eq(0L), eq(dummyLockId));
+
+ AssertHelpers.assertThrows(
+ "Expected commit failure due to failure in heartbeat.",
+ CommitFailedException.class,
+ "Failed to heartbeat for hive lock. Failed to heart beat.",
+ () -> spyOps.doCommit(metadataV2, metadataV1));
+ }
}