You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2023/02/03 13:12:27 UTC
[iceberg] branch master updated: Hive: Refactor commit lock mechanism from HiveTableOperations (#6648)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 333227fbd1 Hive: Refactor commit lock mechanism from HiveTableOperations (#6648)
333227fbd1 is described below
commit 333227fbd13821365cec1bdbfcb9314a239bea0f
Author: pvary <pe...@gmail.com>
AuthorDate: Fri Feb 3 14:12:18 2023 +0100
Hive: Refactor commit lock mechanism from HiveTableOperations (#6648)
Co-authored-by: Adam Szita <40...@users.noreply.github.com>
Co-authored-by: Peter Vary <pe...@apple.com>
---
.../iceberg/exceptions/CommitFailedException.java | 4 +
.../java/org/apache/iceberg/hive/HiveLock.java | 17 +-
.../apache/iceberg/hive/HiveTableOperations.java | 445 +----------------
.../org/apache/iceberg/hive/LockException.java | 9 +-
.../org/apache/iceberg/hive/MetastoreLock.java | 540 +++++++++++++++++++++
.../apache/iceberg/hive/TestHiveCommitLocks.java | 8 +-
.../org/apache/iceberg/hive/TestHiveCommits.java | 34 +-
7 files changed, 602 insertions(+), 455 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java b/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
index ca9d1e4a5b..b9efe55d33 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
@@ -31,4 +31,8 @@ public class CommitFailedException extends RuntimeException {
public CommitFailedException(Throwable cause, String message, Object... args) {
super(String.format(message, args), cause);
}
+
+ public CommitFailedException(Throwable cause) {
+ super(cause);
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveLock.java
similarity index 61%
copy from api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
copy to hive-metastore/src/main/java/org/apache/iceberg/hive/HiveLock.java
index ca9d1e4a5b..d77f990d17 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveLock.java
@@ -16,19 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.exceptions;
+package org.apache.iceberg.hive;
-import com.google.errorprone.annotations.FormatMethod;
+interface HiveLock {
+ void lock() throws LockException;
-/** Exception raised when a commit fails because of out of date metadata. */
-public class CommitFailedException extends RuntimeException {
- @FormatMethod
- public CommitFailedException(String message, Object... args) {
- super(String.format(message, args));
- }
+ void ensureActive() throws LockException;
- @FormatMethod
- public CommitFailedException(Throwable cause, String message, Object... args) {
- super(String.format(message, args), cause);
- }
+ void unlock();
}
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 1781640ca0..5c51af2f61 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -21,39 +21,20 @@ package org.apache.iceberg.hive;
import static org.apache.iceberg.TableProperties.GC_ENABLED;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -75,16 +56,12 @@ import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.JsonUtil;
-import org.apache.iceberg.util.Tasks;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,52 +73,21 @@ import org.slf4j.LoggerFactory;
public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class);
- private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
- private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
- private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
- private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
- "iceberg.hive.lock-creation-timeout-ms";
- private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
- "iceberg.hive.lock-creation-min-wait-ms";
- private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
- "iceberg.hive.lock-creation-max-wait-ms";
- private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
- "iceberg.hive.lock-heartbeat-interval-ms";
private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES =
"iceberg.hive.metadata-refresh-max-retries";
- private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
- "iceberg.hive.table-level-lock-evict-ms";
-
// the max size is based on HMS backend database. For Hive versions below 2.3, the max table
// parameter size is 4000
// characters, see https://issues.apache.org/jira/browse/HIVE-12274
// set to 0 to not expose Iceberg metadata in HMS Table properties.
private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size";
private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672;
- private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
- private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
- private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
- private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
- private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
- private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
- private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;
- private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION =
ImmutableBiMap.of(
// gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things
// but with different names
GC_ENABLED, "external.table.purge");
- private static Cache<String, ReentrantLock> commitLockCache;
-
- private static synchronized void initTableLevelLockCache(long evictionTimeout) {
- if (commitLockCache == null) {
- commitLockCache =
- Caffeine.newBuilder().expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS).build();
- }
- }
-
/**
* Provides key translation where necessary between Iceberg and HMS props. This translation is
* needed because some properties control the same behaviour but are named differently in Iceberg
@@ -161,28 +107,15 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
return ICEBERG_TO_HMS_TRANSLATION.inverse().getOrDefault(hmsProp, hmsProp);
}
- private static class WaitingForLockException extends RuntimeException {
- WaitingForLockException(String message) {
- super(message);
- }
- }
-
private final String fullName;
+ private final String catalogName;
private final String database;
private final String tableName;
private final Configuration conf;
- private final long lockAcquireTimeout;
- private final long lockCheckMinWaitTime;
- private final long lockCheckMaxWaitTime;
- private final long lockCreationTimeout;
- private final long lockCreationMinWaitTime;
- private final long lockCreationMaxWaitTime;
- private final long lockHeartbeatIntervalTime;
private final long maxHiveTablePropertySize;
private final int metadataRefreshMaxRetries;
private final FileIO fileIO;
private final ClientPool<IMetaStoreClient, TException> metaClients;
- private final ScheduledExecutorService exitingScheduledExecutorService;
protected HiveTableOperations(
Configuration conf,
@@ -195,37 +128,15 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
this.metaClients = metaClients;
this.fileIO = fileIO;
this.fullName = catalogName + "." + database + "." + table;
+ this.catalogName = catalogName;
this.database = database;
this.tableName = table;
- this.lockAcquireTimeout =
- conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
- this.lockCheckMinWaitTime =
- conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
- this.lockCheckMaxWaitTime =
- conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
- this.lockCreationTimeout =
- conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS, HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
- this.lockCreationMinWaitTime =
- conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS, HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
- this.lockCreationMaxWaitTime =
- conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS, HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
- this.lockHeartbeatIntervalTime =
- conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
this.metadataRefreshMaxRetries =
conf.getInt(
HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES,
HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
this.maxHiveTablePropertySize =
conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
- long tableLevelLockCacheEvictionTimeout =
- conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
- this.exitingScheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("iceberg-hive-lock-heartbeat-%d")
- .build());
- initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
}
@Override
@@ -275,19 +186,10 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
CommitStatus commitStatus = CommitStatus.FAILURE;
boolean updateHiveTable = false;
- String agentInfo = "Iceberg-" + UUID.randomUUID();
- Optional<Long> lockId = Optional.empty();
- // getting a process-level lock per table to avoid concurrent commit attempts to the same table
- // from the same
- // JVM process, which would result in unnecessary and costly HMS lock acquisition requests
- ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
- tableLevelMutex.lock();
- HiveLockHeartbeat hiveLockHeartbeat = null;
+
+ HiveLock lock = lockObject();
try {
- lockId = Optional.of(acquireLock(agentInfo));
- hiveLockHeartbeat =
- new HiveLockHeartbeat(metaClients, lockId.get(), lockHeartbeatIntervalTime);
- hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
+ lock.lock();
Table tbl = loadHmsTable();
@@ -337,26 +239,20 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
}
- try {
- if (hiveLockHeartbeat.future.isCancelled()
- || hiveLockHeartbeat.encounteredException != null) {
- throw new CommitFailedException(
- "Failed to heartbeat for hive lock. %s",
- hiveLockHeartbeat.encounteredException.getMessage());
- }
+ lock.ensureActive();
+ try {
persistTable(tbl, updateHiveTable);
- if (hiveLockHeartbeat.future.isCancelled()
- || hiveLockHeartbeat.encounteredException != null) {
- throw new CommitStateUnknownException(
- "Failed to heartbeat for hive lock while "
- + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. "
- + "Please check the commit history. If you are running into this issue, try reducing "
- + "iceberg.hive.lock-heartbeat-interval-ms.",
- hiveLockHeartbeat.encounteredException);
- }
+ lock.ensureActive();
commitStatus = CommitStatus.SUCCESS;
+ } catch (LockException le) {
+ throw new CommitStateUnknownException(
+ "Failed to heartbeat for hive lock while "
+ + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. "
+ + "Please check the commit history. If you are running into this issue, try reducing "
+ + "iceberg.hive.lock-heartbeat-interval-ms.",
+ le);
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName);
@@ -391,7 +287,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
throw new CommitStateUnknownException(e);
}
}
- } catch (TException | UnknownHostException e) {
+ } catch (TException e) {
throw new RuntimeException(
String.format("Metastore operation failed for %s.%s", database, tableName), e);
@@ -399,13 +295,11 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during commit", e);
- } finally {
- if (hiveLockHeartbeat != null) {
- hiveLockHeartbeat.cancel();
- }
+ } catch (LockException e) {
+ throw new CommitFailedException(e);
- cleanupMetadataAndUnlock(
- commitStatus, newMetadataLocation, lockId, tableLevelMutex, agentInfo);
+ } finally {
+ cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lock);
}
LOG.info(
@@ -628,82 +522,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
return storageDescriptor;
}
- @VisibleForTesting
- long acquireLock(String agentInfo) throws UnknownHostException, TException, InterruptedException {
- LockInfo lockInfo = tryLock(agentInfo);
-
- final long start = System.currentTimeMillis();
- long duration = 0;
- boolean timeout = false;
-
- try {
- if (lockInfo.lockState.equals(LockState.WAITING)) {
- // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact,
- // the maximum number of
- // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use
- // timeout as the
- // upper bound of retries. So it is just reasonable to set a large retry count. However, if
- // we set
- // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into
- // Integer.MIN_VALUE. Hence,
- // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any
- // boundary issues.
- Tasks.foreach(lockInfo.lockId)
- .retry(Integer.MAX_VALUE - 100)
- .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, lockAcquireTimeout, 1.5)
- .throwFailureWhenFinished()
- .onlyRetryOn(WaitingForLockException.class)
- .run(
- id -> {
- try {
- LockResponse response = metaClients.run(client -> client.checkLock(id));
- LockState newState = response.getState();
- lockInfo.lockState = newState;
- if (newState.equals(LockState.WAITING)) {
- throw new WaitingForLockException(
- String.format("Waiting for lock on table %s.%s", database, tableName));
- }
- } catch (InterruptedException e) {
- Thread.interrupted(); // Clear the interrupt status flag
- LOG.warn(
- "Interrupted while waiting for lock on table {}.{}",
- database,
- tableName,
- e);
- }
- },
- TException.class);
- }
- } catch (WaitingForLockException waitingForLockException) {
- timeout = true;
- duration = System.currentTimeMillis() - start;
- } finally {
- if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
- unlock(Optional.of(lockInfo.lockId), agentInfo);
- }
- }
-
- // timeout and do not have lock acquired
- if (timeout && !lockInfo.lockState.equals(LockState.ACQUIRED)) {
- throw new CommitFailedException(
- "Timed out after %s ms waiting for lock on %s.%s", duration, database, tableName);
- }
-
- if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
- throw new CommitFailedException(
- "Could not acquire the lock on %s.%s, lock request ended in state %s",
- database, tableName, lockInfo.lockState);
- }
-
- return lockInfo.lockId;
- }
-
private void cleanupMetadataAndUnlock(
- CommitStatus commitStatus,
- String metadataLocation,
- Optional<Long> lockId,
- ReentrantLock tableLevelMutex,
- String agentInfo) {
+ CommitStatus commitStatus, String metadataLocation, HiveLock lock) {
try {
if (commitStatus == CommitStatus.FAILURE) {
// If we are sure the commit failed, clean up the uncommitted metadata file
@@ -712,64 +532,10 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
} catch (RuntimeException e) {
LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e);
} finally {
- unlock(lockId, agentInfo);
- tableLevelMutex.unlock();
+ lock.unlock();
}
}
- private void unlock(Optional<Long> lockId, String agentInfo) {
- Long id = null;
- try {
- if (!lockId.isPresent()) {
- // Try to find the lock based on agentInfo. Only works with Hive 2 or later.
- if (HiveVersion.min(HiveVersion.HIVE_2)) {
- LockInfo lockInfo = findLock(agentInfo);
- if (lockInfo == null) {
- // No lock found
- LOG.info("No lock found with {} agentInfo", agentInfo);
- return;
- }
-
- id = lockInfo.lockId;
- } else {
- LOG.warn("Could not find lock with HMSClient {}", HiveVersion.current());
- return;
- }
- } else {
- id = lockId.get();
- }
-
- doUnlock(id);
- } catch (InterruptedException ie) {
- if (id != null) {
- // Interrupted unlock. We try to unlock one more time if we have a lockId
- try {
- Thread.interrupted(); // Clear the interrupt status flag for now, so we can retry unlock
- LOG.warn("Interrupted unlock we try one more time {}.{}", database, tableName, ie);
- doUnlock(id);
- } catch (Exception e) {
- LOG.warn("Failed to unlock even on 2nd attempt {}.{}", database, tableName, e);
- } finally {
- Thread.currentThread().interrupt(); // Set back the interrupt status
- }
- } else {
- Thread.currentThread().interrupt(); // Set back the interrupt status
- LOG.warn("Interrupted finding locks to unlock {}.{}", database, tableName, ie);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unlock {}.{}", database, tableName, e);
- }
- }
-
- @VisibleForTesting
- void doUnlock(long lockId) throws TException, InterruptedException {
- metaClients.run(
- client -> {
- client.unlock(lockId);
- return null;
- });
- }
-
static void validateTableIsIceberg(Table table, String fullName) {
String tableType = table.getParameters().get(TABLE_TYPE_PROP);
NoSuchIcebergTableException.check(
@@ -806,169 +572,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
}
- /**
- * Tries to create a lock. If the lock creation fails, and it is possible then retries the lock
- * creation a few times. If the lock creation is successful then a {@link LockInfo} is returned,
- * otherwise an appropriate exception is thrown.
- *
- * @param agentInfo The agentInfo which should be used during lock creation
- * @return The created lock
- * @throws UnknownHostException When we are not able to fill the hostname for lock creation
- * @throws TException When there is an error during lock creation
- */
- @SuppressWarnings("ReverseDnsLookup")
- private LockInfo tryLock(String agentInfo) throws UnknownHostException, TException {
- LockInfo lockInfo = new LockInfo();
-
- final LockComponent lockComponent =
- new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
- lockComponent.setTablename(tableName);
- final LockRequest lockRequest =
- new LockRequest(
- Lists.newArrayList(lockComponent),
- System.getProperty("user.name"),
- InetAddress.getLocalHost().getHostName());
-
- // Only works in Hive 2 or later.
- if (HiveVersion.min(HiveVersion.HIVE_2)) {
- lockRequest.setAgentInfo(agentInfo);
- }
-
- Tasks.foreach(lockRequest)
- .retry(Integer.MAX_VALUE - 100)
- .exponentialBackoff(
- lockCreationMinWaitTime, lockCreationMaxWaitTime, lockCreationTimeout, 2.0)
- .shouldRetryTest(e -> e instanceof TException && HiveVersion.min(HiveVersion.HIVE_2))
- .throwFailureWhenFinished()
- .run(
- request -> {
- try {
- LockResponse lockResponse = metaClients.run(client -> client.lock(request));
- lockInfo.lockId = lockResponse.getLockid();
- lockInfo.lockState = lockResponse.getState();
- } catch (TException te) {
- LOG.warn("Failed to acquire lock {}", request, te);
- try {
- // If we can not check for lock, or we do not find it, then rethrow the exception
- // Otherwise we are happy as the findLock sets the lockId and the state correctly
- if (!HiveVersion.min(HiveVersion.HIVE_2)) {
- LockInfo lockFound = findLock(agentInfo);
- if (lockFound != null) {
- lockInfo.lockId = lockFound.lockId;
- lockInfo.lockState = lockFound.lockState;
- LOG.info("Found lock {} by agentInfo {}", lockInfo, agentInfo);
- return;
- }
- }
-
- throw te;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn(
- "Interrupted while checking for lock on table {}.{}", database, tableName, e);
- throw new RuntimeException("Interrupted while checking for lock", e);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted while acquiring lock on table {}.{}", database, tableName, e);
- throw new RuntimeException("Interrupted while acquiring lock", e);
- }
- },
- TException.class);
-
- // This should be initialized always, or exception should be thrown.
- LOG.debug("Lock {} created for table {}.{}", lockInfo, database, tableName);
- return lockInfo;
- }
-
- /**
- * Search for the locks using HMSClient.showLocks identified by the agentInfo. If the lock is
- * there, then a {@link LockInfo} object is returned. If the lock is not found <code>null</code>
- * is returned.
- *
- * @param agentInfo The key for searching the locks
- * @return The {@link LockInfo} for the found lock, or <code>null</code> if nothing found
- */
- private LockInfo findLock(String agentInfo) throws TException, InterruptedException {
- Preconditions.checkArgument(
- HiveVersion.min(HiveVersion.HIVE_2),
- "Minimally Hive 2 HMS client is needed to find the Lock using the showLocks API call");
- ShowLocksRequest showLocksRequest = new ShowLocksRequest();
- showLocksRequest.setDbname(database);
- showLocksRequest.setTablename(tableName);
- ShowLocksResponse response = metaClients.run(client -> client.showLocks(showLocksRequest));
- for (ShowLocksResponseElement lock : response.getLocks()) {
- if (lock.getAgentInfo().equals(agentInfo)) {
- // We found our lock
- return new LockInfo(lock.getLockid(), lock.getState());
- }
- }
-
- // Not found anything
- return null;
- }
-
- private static class HiveLockHeartbeat implements Runnable {
- private final ClientPool<IMetaStoreClient, TException> hmsClients;
- private final long lockId;
- private final long intervalMs;
- private ScheduledFuture<?> future;
- private volatile Exception encounteredException = null;
-
- HiveLockHeartbeat(
- ClientPool<IMetaStoreClient, TException> hmsClients, long lockId, long intervalMs) {
- this.hmsClients = hmsClients;
- this.lockId = lockId;
- this.intervalMs = intervalMs;
- this.future = null;
- }
-
- @Override
- public void run() {
- try {
- hmsClients.run(
- client -> {
- client.heartbeat(0, lockId);
- return null;
- });
- } catch (TException | InterruptedException e) {
- this.encounteredException = e;
- throw new CommitFailedException(e, "Failed to heartbeat for lock: %d", lockId);
- }
- }
-
- public void schedule(ScheduledExecutorService scheduler) {
- future =
- scheduler.scheduleAtFixedRate(this, intervalMs / 2, intervalMs, TimeUnit.MILLISECONDS);
- }
-
- public void cancel() {
- if (future != null) {
- future.cancel(false);
- }
- }
- }
-
- private static class LockInfo {
- private long lockId;
- private LockState lockState;
-
- private LockInfo() {
- this.lockId = -1;
- this.lockState = null;
- }
-
- private LockInfo(long lockId, LockState lockState) {
- this.lockId = lockId;
- this.lockState = lockState;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("lockId", lockId)
- .add("lockState", lockState)
- .toString();
- }
+ @VisibleForTesting
+ HiveLock lockObject() {
+ return new MetastoreLock(conf, metaClients, catalogName, database, tableName);
}
}
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/LockException.java
similarity index 75%
copy from api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
copy to hive-metastore/src/main/java/org/apache/iceberg/hive/LockException.java
index ca9d1e4a5b..0673ff4f06 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/LockException.java
@@ -16,19 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.exceptions;
+package org.apache.iceberg.hive;
import com.google.errorprone.annotations.FormatMethod;
-/** Exception raised when a commit fails because of out of date metadata. */
-public class CommitFailedException extends RuntimeException {
+class LockException extends RuntimeException {
@FormatMethod
- public CommitFailedException(String message, Object... args) {
+ LockException(String message, Object... args) {
super(String.format(message, args));
}
@FormatMethod
- public CommitFailedException(Throwable cause, String message, Object... args) {
+ LockException(Throwable cause, String message, Object... args) {
super(String.format(message, args), cause);
}
}
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java
new file mode 100644
index 0000000000..5820365339
--- /dev/null
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class MetastoreLock implements HiveLock {
+ private static final Logger LOG = LoggerFactory.getLogger(MetastoreLock.class);
+ private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+ private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+ private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+ private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
+ "iceberg.hive.lock-creation-timeout-ms";
+ private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
+ "iceberg.hive.lock-creation-min-wait-ms";
+ private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
+ "iceberg.hive.lock-creation-max-wait-ms";
+ private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+ "iceberg.hive.lock-heartbeat-interval-ms";
+ private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+ "iceberg.hive.table-level-lock-evict-ms";
+
+ private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+ private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+ private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+ private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+ private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+ private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+ private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+ private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+ private static volatile Cache<String, ReentrantLock> commitLockCache;
+
+ private final ClientPool<IMetaStoreClient, TException> metaClients;
+ private final String databaseName;
+ private final String tableName;
+ private final String fullName;
+ private final long lockAcquireTimeout;
+ private final long lockCheckMinWaitTime;
+ private final long lockCheckMaxWaitTime;
+ private final long lockCreationTimeout;
+ private final long lockCreationMinWaitTime;
+ private final long lockCreationMaxWaitTime;
+ private final long lockHeartbeatIntervalTime;
+ private final ScheduledExecutorService exitingScheduledExecutorService;
+ private final String agentInfo;
+
+ private Optional<Long> hmsLockId = Optional.empty();
+ private ReentrantLock jvmLock = null;
+ private Heartbeat heartbeat = null;
+
+ MetastoreLock(
+ Configuration conf,
+ ClientPool<IMetaStoreClient, TException> metaClients,
+ String catalogName,
+ String databaseName,
+ String tableName) {
+ this.metaClients = metaClients;
+ this.fullName = catalogName + "." + databaseName + "." + tableName;
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+
+ this.lockAcquireTimeout =
+ conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+ this.lockCheckMinWaitTime =
+ conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+ this.lockCheckMaxWaitTime =
+ conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+ this.lockCreationTimeout =
+ conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS, HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
+ this.lockCreationMinWaitTime =
+ conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS, HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
+ this.lockCreationMaxWaitTime =
+ conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS, HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
+ this.lockHeartbeatIntervalTime =
+ conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+ long tableLevelLockCacheEvictionTimeout =
+ conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+
+ this.agentInfo = "Iceberg-" + UUID.randomUUID();
+
+ this.exitingScheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("iceberg-hive-lock-heartbeat-" + fullName + "-%d")
+ .build());
+
+ initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+ }
+
+ @Override
+ public void lock() throws LockException {
+ // getting a process-level lock per table to avoid concurrent commit attempts to the same table
+ // from the same JVM process, which would result in unnecessary HMS lock acquisition requests
+ acquireJvmLock();
+
+ // Getting HMS lock
+ hmsLockId = Optional.of(acquireLock());
+
+ // Starting heartbeat for the HMS lock
+ heartbeat = new Heartbeat(metaClients, hmsLockId.get(), lockHeartbeatIntervalTime);
+ heartbeat.schedule(exitingScheduledExecutorService);
+ }
+
+ @Override
+ public void ensureActive() throws LockException {
+ if (heartbeat == null) {
+ throw new LockException("Lock is not active");
+ }
+
+ if (heartbeat.encounteredException != null) {
+ throw new LockException(
+ heartbeat.encounteredException,
+ "Failed to heartbeat for hive lock. %s",
+ heartbeat.encounteredException.getMessage());
+ }
+ if (!heartbeat.active()) {
+ throw new LockException("Hive lock heartbeat thread not active");
+ }
+ }
+
+ @Override
+ public void unlock() {
+ if (heartbeat != null) {
+ heartbeat.cancel();
+ exitingScheduledExecutorService.shutdown();
+ }
+
+ try {
+ unlock(hmsLockId);
+ } finally {
+ releaseJvmLock();
+ }
+ }
+
+ private long acquireLock() throws LockException {
+ LockInfo lockInfo = createLock();
+
+ final long start = System.currentTimeMillis();
+ long duration = 0;
+ boolean timeout = false;
+ TException thriftError = null;
+
+ try {
+ if (lockInfo.lockState.equals(LockState.WAITING)) {
+ // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact,
+ // the maximum number of
+ // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use
+ // timeout as the
+ // upper bound of retries. So it is just reasonable to set a large retry count. However, if
+ // we set
+ // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into
+ // Integer.MIN_VALUE. Hence,
+ // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any
+ // boundary issues.
+ Tasks.foreach(lockInfo.lockId)
+ .retry(Integer.MAX_VALUE - 100)
+ .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, lockAcquireTimeout, 1.5)
+ .throwFailureWhenFinished()
+ .onlyRetryOn(WaitingForLockException.class)
+ .run(
+ id -> {
+ try {
+ LockResponse response = metaClients.run(client -> client.checkLock(id));
+ LockState newState = response.getState();
+ lockInfo.lockState = newState;
+ if (newState.equals(LockState.WAITING)) {
+ throw new WaitingForLockException(
+ String.format(
+ "Waiting for lock on table %s.%s", databaseName, tableName));
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted(); // Clear the interrupt status flag
+ LOG.warn(
+ "Interrupted while waiting for lock on table {}.{}",
+ databaseName,
+ tableName,
+ e);
+ }
+ },
+ TException.class);
+ }
+ } catch (WaitingForLockException e) {
+ timeout = true;
+ duration = System.currentTimeMillis() - start;
+ } catch (TException e) {
+ thriftError = e;
+ } finally {
+ if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
+ unlock(Optional.of(lockInfo.lockId));
+ }
+ }
+
+ if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
+ if (timeout) {
+ throw new LockException(
+ "Timed out after %s ms waiting for lock on %s.%s", duration, databaseName, tableName);
+ }
+
+ if (thriftError != null) {
+ throw new LockException(
+ thriftError, "Metastore operation failed for %s.%s", databaseName, tableName);
+ }
+
+ // Just for safety. We should not get here.
+ throw new LockException(
+ "Could not acquire the lock on %s.%s, lock request ended in state %s",
+ databaseName, tableName, lockInfo.lockState);
+ } else {
+ return lockInfo.lockId;
+ }
+ }
+
+ /**
+ * Creates a lock, retrying if possible on failure.
+ *
+ * @return The {@link LockInfo} object for the successfully created lock
+ * @throws LockException When we are not able to fill the hostname for lock creation, or there is
+ * an error during lock creation
+ */
+ @SuppressWarnings("ReverseDnsLookup")
+ private LockInfo createLock() throws LockException {
+ LockInfo lockInfo = new LockInfo();
+
+ String hostName;
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException uhe) {
+ throw new LockException(uhe, "Error generating host name");
+ }
+
+ LockComponent lockComponent =
+ new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, databaseName);
+ lockComponent.setTablename(tableName);
+ LockRequest lockRequest =
+ new LockRequest(Lists.newArrayList(lockComponent), HiveHadoopUtil.currentUser(), hostName);
+
+ // Only works in Hive 2 or later.
+ if (HiveVersion.min(HiveVersion.HIVE_2)) {
+ lockRequest.setAgentInfo(agentInfo);
+ }
+
+ AtomicBoolean interrupted = new AtomicBoolean(false);
+ Tasks.foreach(lockRequest)
+ .retry(Integer.MAX_VALUE - 100)
+ .exponentialBackoff(
+ lockCreationMinWaitTime, lockCreationMaxWaitTime, lockCreationTimeout, 2.0)
+ .shouldRetryTest(
+ e ->
+ !interrupted.get()
+ && e instanceof LockException
+ && HiveVersion.min(HiveVersion.HIVE_2))
+ .throwFailureWhenFinished()
+ .run(
+ request -> {
+ try {
+ LockResponse lockResponse = metaClients.run(client -> client.lock(request));
+ lockInfo.lockId = lockResponse.getLockid();
+ lockInfo.lockState = lockResponse.getState();
+ } catch (TException te) {
+ LOG.warn("Failed to create lock {}", request, te);
+ try {
+ // If we can not check for lock, or we do not find it, then rethrow the exception
+ // Otherwise we are happy as the findLock sets the lockId and the state correctly
+ if (!HiveVersion.min(HiveVersion.HIVE_2)) {
+ LockInfo lockFound = findLock();
+ if (lockFound != null) {
+ lockInfo.lockId = lockFound.lockId;
+ lockInfo.lockState = lockFound.lockState;
+ LOG.info("Found lock {} by agentInfo {}", lockInfo, agentInfo);
+ return;
+ }
+ }
+
+ throw new LockException(
+ "Failed to find lock for table %s.%s", databaseName, tableName);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ interrupted.set(true);
+ LOG.warn(
+ "Interrupted while trying to find lock for table {}.{}",
+ databaseName,
+ tableName,
+ e);
+ throw new LockException(
+ e,
+ "Interrupted while trying to find lock for table %s.%s",
+ databaseName,
+ tableName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ interrupted.set(true);
+ LOG.warn(
+ "Interrupted while creating lock on table {}.{}", databaseName, tableName, e);
+ throw new LockException(
+ e, "Interrupted while creating lock on table %s.%s", databaseName, tableName);
+ }
+ },
+ LockException.class);
+
+ // This should be initialized always, or exception should be thrown.
+ LOG.debug("Lock {} created for table {}.{}", lockInfo, databaseName, tableName);
+ return lockInfo;
+ }
+
+ /**
+ * Search for the locks using HMSClient.showLocks identified by the agentInfo. If the lock is
+ * there, then a {@link LockInfo} object is returned. If the lock is not found <code>null</code>
+ * is returned.
+ *
+ * @return The {@link LockInfo} for the found lock, or <code>null</code> if nothing found
+ */
+ private LockInfo findLock() throws LockException, InterruptedException {
+ Preconditions.checkArgument(
+ HiveVersion.min(HiveVersion.HIVE_2),
+ "Minimally Hive 2 HMS client is needed to find the Lock using the showLocks API call");
+ ShowLocksRequest showLocksRequest = new ShowLocksRequest();
+ showLocksRequest.setDbname(databaseName);
+ showLocksRequest.setTablename(tableName);
+ ShowLocksResponse response;
+ try {
+ response = metaClients.run(client -> client.showLocks(showLocksRequest));
+ } catch (TException e) {
+ throw new LockException(e, "Failed to find lock for table %s.%s", databaseName, tableName);
+ }
+ for (ShowLocksResponseElement lock : response.getLocks()) {
+ if (lock.getAgentInfo().equals(agentInfo)) {
+ // We found our lock
+ return new LockInfo(lock.getLockid(), lock.getState());
+ }
+ }
+
+ // Not found anything
+ return null;
+ }
+
+ private void unlock(Optional<Long> lockId) {
+ Long id = null;
+ try {
+ if (!lockId.isPresent()) {
+ // Try to find the lock based on agentInfo. Only works with Hive 2 or later.
+ if (HiveVersion.min(HiveVersion.HIVE_2)) {
+ LockInfo lockInfo = findLock();
+ if (lockInfo == null) {
+ // No lock found
+ LOG.info("No lock found with {} agentInfo", agentInfo);
+ return;
+ }
+
+ id = lockInfo.lockId;
+ } else {
+ LOG.warn("Could not find lock with HMSClient {}", HiveVersion.current());
+ return;
+ }
+ } else {
+ id = lockId.get();
+ }
+
+ doUnlock(id);
+ } catch (InterruptedException ie) {
+ if (id != null) {
+ // Interrupted unlock. We try to unlock one more time if we have a lockId
+ try {
+ Thread.interrupted(); // Clear the interrupt status flag for now, so we can retry unlock
+ LOG.warn("Interrupted unlock we try one more time {}.{}", databaseName, tableName, ie);
+ doUnlock(id);
+ } catch (Exception e) {
+ LOG.warn("Failed to unlock even on 2nd attempt {}.{}", databaseName, tableName, e);
+ } finally {
+ Thread.currentThread().interrupt(); // Set back the interrupt status
+ }
+ } else {
+ Thread.currentThread().interrupt(); // Set back the interrupt status
+ LOG.warn("Interrupted finding locks to unlock {}.{}", databaseName, tableName, ie);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unlock {}.{}", databaseName, tableName, e);
+ }
+ }
+
+ private void doUnlock(long lockId) throws TException, InterruptedException {
+ metaClients.run(
+ client -> {
+ client.unlock(lockId);
+ return null;
+ });
+ }
+
+ private void acquireJvmLock() {
+ if (jvmLock != null) {
+ throw new IllegalStateException(
+ String.format("Cannot call acquireLock twice for %s", fullName));
+ }
+
+ jvmLock = commitLockCache.get(fullName, t -> new ReentrantLock(true));
+ jvmLock.lock();
+ }
+
+ private void releaseJvmLock() {
+ if (jvmLock != null) {
+ jvmLock.unlock();
+ jvmLock = null;
+ }
+ }
+
+ private static void initTableLevelLockCache(long evictionTimeout) {
+ if (commitLockCache == null) {
+ synchronized (MetastoreLock.class) {
+ if (commitLockCache == null) {
+ commitLockCache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+ .build();
+ }
+ }
+ }
+ }
+
+ private static class Heartbeat implements Runnable {
+ private final ClientPool<IMetaStoreClient, TException> hmsClients;
+ private final long lockId;
+ private final long intervalMs;
+ private ScheduledFuture<?> future;
+ private volatile Exception encounteredException = null;
+
+ Heartbeat(ClientPool<IMetaStoreClient, TException> hmsClients, long lockId, long intervalMs) {
+ this.hmsClients = hmsClients;
+ this.lockId = lockId;
+ this.intervalMs = intervalMs;
+ this.future = null;
+ }
+
+ @Override
+ public void run() {
+ try {
+ hmsClients.run(
+ client -> {
+ client.heartbeat(0, lockId);
+ return null;
+ });
+ } catch (TException | InterruptedException e) {
+ this.encounteredException = e;
+ throw new CommitFailedException(e, "Failed to heartbeat for lock: %d", lockId);
+ }
+ }
+
+ public void schedule(ScheduledExecutorService scheduler) {
+ future =
+ scheduler.scheduleAtFixedRate(this, intervalMs / 2, intervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ boolean active() {
+ return future != null && !future.isCancelled();
+ }
+
+ public void cancel() {
+ if (future != null) {
+ future.cancel(false);
+ }
+ }
+ }
+
+ private static class LockInfo {
+ private long lockId;
+ private LockState lockState;
+
+ private LockInfo() {
+ this.lockId = -1;
+ this.lockState = null;
+ }
+
+ private LockInfo(long lockId, LockState lockState) {
+ this.lockId = lockId;
+ this.lockState = lockState;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("lockId", lockId)
+ .add("lockState", lockState)
+ .toString();
+ }
+ }
+
+ private static class WaitingForLockException extends RuntimeException {
+ WaitingForLockException(String message) {
+ super(message);
+ }
+ }
+}
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 8b439047ca..d64653301e 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -157,7 +157,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
@Test
public void testLockAcquisitionAtFirstTime() throws TException, InterruptedException {
doReturn(acquiredLockResponse).when(spyClient).lock(any());
- doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+ doNothing().when(spyClient).unlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
spyOps.doCommit(metadataV2, metadataV1);
@@ -176,7 +176,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
.doReturn(acquiredLockResponse)
.when(spyClient)
.checkLock(eq(dummyLockId));
- doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+ doNothing().when(spyClient).unlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
spyOps.doCommit(metadataV2, metadataV1);
@@ -195,7 +195,6 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
.doReturn(acquiredLockResponse)
.when(spyClient)
.checkLock(eq(dummyLockId));
- doNothing().when(spyOps).doUnlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
spyOps.doCommit(metadataV2, metadataV1);
@@ -220,7 +219,6 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
doReturn(showLocksResponse).when(spyClient).showLocks(any());
doReturn(acquiredLockResponse).when(spyClient).checkLock(eq(dummyLockId));
- doNothing().when(spyOps).doUnlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
spyOps.doCommit(metadataV2, metadataV1);
@@ -282,7 +280,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
AssertHelpers.assertThrows(
"Expected an exception",
RuntimeException.class,
- "Interrupted while acquiring lock",
+ "Interrupted while creating lock",
() -> spyOps.doCommit(metadataV2, metadataV1));
verify(spyClient, times(1)).unlock(eq(dummyLockId));
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
index 869cb9fe44..b9799cefe6 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
@@ -27,7 +27,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
@@ -41,7 +41,6 @@ import org.apache.iceberg.types.Types;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
public class TestHiveCommits extends HiveTableBaseTest {
@@ -62,13 +61,22 @@ public class TestHiveCommits extends HiveTableBaseTest {
HiveTableOperations spyOps = spy(ops);
- ArgumentCaptor<Long> lockId = ArgumentCaptor.forClass(Long.class);
- doThrow(new RuntimeException()).when(spyOps).doUnlock(lockId.capture());
+ AtomicReference<HiveLock> lockRef = new AtomicReference<>();
+
+ when(spyOps.lockObject())
+ .thenAnswer(
+ i -> {
+ HiveLock lock = (HiveLock) i.callRealMethod();
+ lockRef.set(lock);
+ return lock;
+ });
try {
spyOps.commit(metadataV2, metadataV1);
+ HiveLock spyLock = spy(lockRef.get());
+ doThrow(new RuntimeException()).when(spyLock).unlock();
} finally {
- ops.doUnlock(lockId.getValue());
+ lockRef.get().unlock();
}
ops.refresh();
@@ -268,16 +276,16 @@ public class TestHiveCommits extends HiveTableBaseTest {
HiveTableOperations spyOps = spy(ops);
- AtomicLong lockId = new AtomicLong();
+ AtomicReference<HiveLock> lock = new AtomicReference<>();
doAnswer(
- i -> {
- lockId.set(ops.acquireLock("agentInfo"));
- return lockId.get();
+ l -> {
+ lock.set(ops.lockObject());
+ return lock.get();
})
.when(spyOps)
- .acquireLock(any());
+ .lockObject();
- concurrentCommitAndThrowException(ops, spyOps, table, lockId);
+ concurrentCommitAndThrowException(ops, spyOps, table, lock);
/*
This commit and our concurrent commit should succeed even though this commit throws an exception
@@ -331,7 +339,7 @@ public class TestHiveCommits extends HiveTableBaseTest {
HiveTableOperations realOperations,
HiveTableOperations spyOperations,
Table table,
- AtomicLong lockId)
+ AtomicReference<HiveLock> lock)
throws TException, InterruptedException {
// Simulate a communication error after a successful commit
doAnswer(
@@ -340,7 +348,7 @@ public class TestHiveCommits extends HiveTableBaseTest {
i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class);
realOperations.persistTable(tbl, true);
// Simulate lock expiration or removal
- realOperations.doUnlock(lockId.get());
+ lock.get().unlock();
table.refresh();
table.updateSchema().addColumn("newCol", Types.IntegerType.get()).commit();
throw new TException("Datacenter on fire");