You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2023/02/03 13:12:27 UTC

[iceberg] branch master updated: Hive: Refactor commit lock mechanism from HiveTableOperations (#6648)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 333227fbd1 Hive: Refactor commit lock mechanism from HiveTableOperations (#6648)
333227fbd1 is described below

commit 333227fbd13821365cec1bdbfcb9314a239bea0f
Author: pvary <pe...@gmail.com>
AuthorDate: Fri Feb 3 14:12:18 2023 +0100

    Hive: Refactor commit lock mechanism from HiveTableOperations (#6648)
    
    Co-authored-by: Adam Szita <40...@users.noreply.github.com>
    Co-authored-by: Peter Vary <pe...@apple.com>
---
 .../iceberg/exceptions/CommitFailedException.java  |   4 +
 .../java/org/apache/iceberg/hive/HiveLock.java     |  17 +-
 .../apache/iceberg/hive/HiveTableOperations.java   | 445 +----------------
 .../org/apache/iceberg/hive/LockException.java     |   9 +-
 .../org/apache/iceberg/hive/MetastoreLock.java     | 540 +++++++++++++++++++++
 .../apache/iceberg/hive/TestHiveCommitLocks.java   |   8 +-
 .../org/apache/iceberg/hive/TestHiveCommits.java   |  34 +-
 7 files changed, 602 insertions(+), 455 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java b/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
index ca9d1e4a5b..b9efe55d33 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
@@ -31,4 +31,8 @@ public class CommitFailedException extends RuntimeException {
   public CommitFailedException(Throwable cause, String message, Object... args) {
     super(String.format(message, args), cause);
   }
+
+  public CommitFailedException(Throwable cause) {
+    super(cause);
+  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveLock.java
similarity index 61%
copy from api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
copy to hive-metastore/src/main/java/org/apache/iceberg/hive/HiveLock.java
index ca9d1e4a5b..d77f990d17 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveLock.java
@@ -16,19 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.exceptions;
+package org.apache.iceberg.hive;
 
-import com.google.errorprone.annotations.FormatMethod;
+interface HiveLock {
+  void lock() throws LockException;
 
-/** Exception raised when a commit fails because of out of date metadata. */
-public class CommitFailedException extends RuntimeException {
-  @FormatMethod
-  public CommitFailedException(String message, Object... args) {
-    super(String.format(message, args));
-  }
+  void ensureActive() throws LockException;
 
-  @FormatMethod
-  public CommitFailedException(Throwable cause, String message, Object... args) {
-    super(String.format(message, args), cause);
-  }
+  void unlock();
 }
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 1781640ca0..5c51af2f61 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -21,39 +21,20 @@ package org.apache.iceberg.hive;
 import static org.apache.iceberg.TableProperties.GC_ENABLED;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -75,16 +56,12 @@ import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.ConfigProperties;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.JsonUtil;
-import org.apache.iceberg.util.Tasks;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,52 +73,21 @@ import org.slf4j.LoggerFactory;
 public class HiveTableOperations extends BaseMetastoreTableOperations {
   private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class);
 
-  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
-  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
-  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
-  private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
-      "iceberg.hive.lock-creation-timeout-ms";
-  private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
-      "iceberg.hive.lock-creation-min-wait-ms";
-  private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
-      "iceberg.hive.lock-creation-max-wait-ms";
-  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
-      "iceberg.hive.lock-heartbeat-interval-ms";
   private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES =
       "iceberg.hive.metadata-refresh-max-retries";
-  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
-      "iceberg.hive.table-level-lock-evict-ms";
-
   // the max size is based on HMS backend database. For Hive versions below 2.3, the max table
   // parameter size is 4000
   // characters, see https://issues.apache.org/jira/browse/HIVE-12274
   // set to 0 to not expose Iceberg metadata in HMS Table properties.
   private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size";
   private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672;
-  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
-  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
-  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
-  private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
-  private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
-  private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
-  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
   private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;
-  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
   private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION =
       ImmutableBiMap.of(
           // gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things
           // but with different names
           GC_ENABLED, "external.table.purge");
 
-  private static Cache<String, ReentrantLock> commitLockCache;
-
-  private static synchronized void initTableLevelLockCache(long evictionTimeout) {
-    if (commitLockCache == null) {
-      commitLockCache =
-          Caffeine.newBuilder().expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS).build();
-    }
-  }
-
   /**
    * Provides key translation where necessary between Iceberg and HMS props. This translation is
    * needed because some properties control the same behaviour but are named differently in Iceberg
@@ -161,28 +107,15 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     return ICEBERG_TO_HMS_TRANSLATION.inverse().getOrDefault(hmsProp, hmsProp);
   }
 
-  private static class WaitingForLockException extends RuntimeException {
-    WaitingForLockException(String message) {
-      super(message);
-    }
-  }
-
   private final String fullName;
+  private final String catalogName;
   private final String database;
   private final String tableName;
   private final Configuration conf;
-  private final long lockAcquireTimeout;
-  private final long lockCheckMinWaitTime;
-  private final long lockCheckMaxWaitTime;
-  private final long lockCreationTimeout;
-  private final long lockCreationMinWaitTime;
-  private final long lockCreationMaxWaitTime;
-  private final long lockHeartbeatIntervalTime;
   private final long maxHiveTablePropertySize;
   private final int metadataRefreshMaxRetries;
   private final FileIO fileIO;
   private final ClientPool<IMetaStoreClient, TException> metaClients;
-  private final ScheduledExecutorService exitingScheduledExecutorService;
 
   protected HiveTableOperations(
       Configuration conf,
@@ -195,37 +128,15 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     this.metaClients = metaClients;
     this.fileIO = fileIO;
     this.fullName = catalogName + "." + database + "." + table;
+    this.catalogName = catalogName;
     this.database = database;
     this.tableName = table;
-    this.lockAcquireTimeout =
-        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
-    this.lockCheckMinWaitTime =
-        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
-    this.lockCheckMaxWaitTime =
-        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
-    this.lockCreationTimeout =
-        conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS, HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
-    this.lockCreationMinWaitTime =
-        conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS, HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
-    this.lockCreationMaxWaitTime =
-        conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS, HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
-    this.lockHeartbeatIntervalTime =
-        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
     this.metadataRefreshMaxRetries =
         conf.getInt(
             HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES,
             HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
     this.maxHiveTablePropertySize =
         conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
-    long tableLevelLockCacheEvictionTimeout =
-        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
-    this.exitingScheduledExecutorService =
-        Executors.newSingleThreadScheduledExecutor(
-            new ThreadFactoryBuilder()
-                .setDaemon(true)
-                .setNameFormat("iceberg-hive-lock-heartbeat-%d")
-                .build());
-    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
   }
 
   @Override
@@ -275,19 +186,10 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
 
     CommitStatus commitStatus = CommitStatus.FAILURE;
     boolean updateHiveTable = false;
-    String agentInfo = "Iceberg-" + UUID.randomUUID();
-    Optional<Long> lockId = Optional.empty();
-    // getting a process-level lock per table to avoid concurrent commit attempts to the same table
-    // from the same
-    // JVM process, which would result in unnecessary and costly HMS lock acquisition requests
-    ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
-    tableLevelMutex.lock();
-    HiveLockHeartbeat hiveLockHeartbeat = null;
+
+    HiveLock lock = lockObject();
     try {
-      lockId = Optional.of(acquireLock(agentInfo));
-      hiveLockHeartbeat =
-          new HiveLockHeartbeat(metaClients, lockId.get(), lockHeartbeatIntervalTime);
-      hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
+      lock.lock();
 
       Table tbl = loadHmsTable();
 
@@ -337,26 +239,20 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
         tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
       }
 
-      try {
-        if (hiveLockHeartbeat.future.isCancelled()
-            || hiveLockHeartbeat.encounteredException != null) {
-          throw new CommitFailedException(
-              "Failed to heartbeat for hive lock. %s",
-              hiveLockHeartbeat.encounteredException.getMessage());
-        }
+      lock.ensureActive();
 
+      try {
         persistTable(tbl, updateHiveTable);
-        if (hiveLockHeartbeat.future.isCancelled()
-            || hiveLockHeartbeat.encounteredException != null) {
-          throw new CommitStateUnknownException(
-              "Failed to heartbeat for hive lock while "
-                  + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. "
-                  + "Please check the commit history. If you are running into this issue, try reducing "
-                  + "iceberg.hive.lock-heartbeat-interval-ms.",
-              hiveLockHeartbeat.encounteredException);
-        }
+        lock.ensureActive();
 
         commitStatus = CommitStatus.SUCCESS;
+      } catch (LockException le) {
+        throw new CommitStateUnknownException(
+            "Failed to heartbeat for hive lock while "
+                + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. "
+                + "Please check the commit history. If you are running into this issue, try reducing "
+                + "iceberg.hive.lock-heartbeat-interval-ms.",
+            le);
       } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
         throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName);
 
@@ -391,7 +287,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
             throw new CommitStateUnknownException(e);
         }
       }
-    } catch (TException | UnknownHostException e) {
+    } catch (TException e) {
       throw new RuntimeException(
           String.format("Metastore operation failed for %s.%s", database, tableName), e);
 
@@ -399,13 +295,11 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
       Thread.currentThread().interrupt();
       throw new RuntimeException("Interrupted during commit", e);
 
-    } finally {
-      if (hiveLockHeartbeat != null) {
-        hiveLockHeartbeat.cancel();
-      }
+    } catch (LockException e) {
+      throw new CommitFailedException(e);
 
-      cleanupMetadataAndUnlock(
-          commitStatus, newMetadataLocation, lockId, tableLevelMutex, agentInfo);
+    } finally {
+      cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lock);
     }
 
     LOG.info(
@@ -628,82 +522,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     return storageDescriptor;
   }
 
-  @VisibleForTesting
-  long acquireLock(String agentInfo) throws UnknownHostException, TException, InterruptedException {
-    LockInfo lockInfo = tryLock(agentInfo);
-
-    final long start = System.currentTimeMillis();
-    long duration = 0;
-    boolean timeout = false;
-
-    try {
-      if (lockInfo.lockState.equals(LockState.WAITING)) {
-        // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact,
-        // the maximum number of
-        // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use
-        // timeout as the
-        // upper bound of retries. So it is just reasonable to set a large retry count. However, if
-        // we set
-        // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into
-        // Integer.MIN_VALUE. Hence,
-        // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any
-        // boundary issues.
-        Tasks.foreach(lockInfo.lockId)
-            .retry(Integer.MAX_VALUE - 100)
-            .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, lockAcquireTimeout, 1.5)
-            .throwFailureWhenFinished()
-            .onlyRetryOn(WaitingForLockException.class)
-            .run(
-                id -> {
-                  try {
-                    LockResponse response = metaClients.run(client -> client.checkLock(id));
-                    LockState newState = response.getState();
-                    lockInfo.lockState = newState;
-                    if (newState.equals(LockState.WAITING)) {
-                      throw new WaitingForLockException(
-                          String.format("Waiting for lock on table %s.%s", database, tableName));
-                    }
-                  } catch (InterruptedException e) {
-                    Thread.interrupted(); // Clear the interrupt status flag
-                    LOG.warn(
-                        "Interrupted while waiting for lock on table {}.{}",
-                        database,
-                        tableName,
-                        e);
-                  }
-                },
-                TException.class);
-      }
-    } catch (WaitingForLockException waitingForLockException) {
-      timeout = true;
-      duration = System.currentTimeMillis() - start;
-    } finally {
-      if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
-        unlock(Optional.of(lockInfo.lockId), agentInfo);
-      }
-    }
-
-    // timeout and do not have lock acquired
-    if (timeout && !lockInfo.lockState.equals(LockState.ACQUIRED)) {
-      throw new CommitFailedException(
-          "Timed out after %s ms waiting for lock on %s.%s", duration, database, tableName);
-    }
-
-    if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
-      throw new CommitFailedException(
-          "Could not acquire the lock on %s.%s, lock request ended in state %s",
-          database, tableName, lockInfo.lockState);
-    }
-
-    return lockInfo.lockId;
-  }
-
   private void cleanupMetadataAndUnlock(
-      CommitStatus commitStatus,
-      String metadataLocation,
-      Optional<Long> lockId,
-      ReentrantLock tableLevelMutex,
-      String agentInfo) {
+      CommitStatus commitStatus, String metadataLocation, HiveLock lock) {
     try {
       if (commitStatus == CommitStatus.FAILURE) {
         // If we are sure the commit failed, clean up the uncommitted metadata file
@@ -712,64 +532,10 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     } catch (RuntimeException e) {
       LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e);
     } finally {
-      unlock(lockId, agentInfo);
-      tableLevelMutex.unlock();
+      lock.unlock();
     }
   }
 
-  private void unlock(Optional<Long> lockId, String agentInfo) {
-    Long id = null;
-    try {
-      if (!lockId.isPresent()) {
-        // Try to find the lock based on agentInfo. Only works with Hive 2 or later.
-        if (HiveVersion.min(HiveVersion.HIVE_2)) {
-          LockInfo lockInfo = findLock(agentInfo);
-          if (lockInfo == null) {
-            // No lock found
-            LOG.info("No lock found with {} agentInfo", agentInfo);
-            return;
-          }
-
-          id = lockInfo.lockId;
-        } else {
-          LOG.warn("Could not find lock with HMSClient {}", HiveVersion.current());
-          return;
-        }
-      } else {
-        id = lockId.get();
-      }
-
-      doUnlock(id);
-    } catch (InterruptedException ie) {
-      if (id != null) {
-        // Interrupted unlock. We try to unlock one more time if we have a lockId
-        try {
-          Thread.interrupted(); // Clear the interrupt status flag for now, so we can retry unlock
-          LOG.warn("Interrupted unlock we try one more time {}.{}", database, tableName, ie);
-          doUnlock(id);
-        } catch (Exception e) {
-          LOG.warn("Failed to unlock even on 2nd attempt {}.{}", database, tableName, e);
-        } finally {
-          Thread.currentThread().interrupt(); // Set back the interrupt status
-        }
-      } else {
-        Thread.currentThread().interrupt(); // Set back the interrupt status
-        LOG.warn("Interrupted finding locks to unlock {}.{}", database, tableName, ie);
-      }
-    } catch (Exception e) {
-      LOG.warn("Failed to unlock {}.{}", database, tableName, e);
-    }
-  }
-
-  @VisibleForTesting
-  void doUnlock(long lockId) throws TException, InterruptedException {
-    metaClients.run(
-        client -> {
-          client.unlock(lockId);
-          return null;
-        });
-  }
-
   static void validateTableIsIceberg(Table table, String fullName) {
     String tableType = table.getParameters().get(TABLE_TYPE_PROP);
     NoSuchIcebergTableException.check(
@@ -806,169 +572,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
         ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
   }
 
-  /**
-   * Tries to create a lock. If the lock creation fails, and it is possible then retries the lock
-   * creation a few times. If the lock creation is successful then a {@link LockInfo} is returned,
-   * otherwise an appropriate exception is thrown.
-   *
-   * @param agentInfo The agentInfo which should be used during lock creation
-   * @return The created lock
-   * @throws UnknownHostException When we are not able to fill the hostname for lock creation
-   * @throws TException When there is an error during lock creation
-   */
-  @SuppressWarnings("ReverseDnsLookup")
-  private LockInfo tryLock(String agentInfo) throws UnknownHostException, TException {
-    LockInfo lockInfo = new LockInfo();
-
-    final LockComponent lockComponent =
-        new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
-    lockComponent.setTablename(tableName);
-    final LockRequest lockRequest =
-        new LockRequest(
-            Lists.newArrayList(lockComponent),
-            System.getProperty("user.name"),
-            InetAddress.getLocalHost().getHostName());
-
-    // Only works in Hive 2 or later.
-    if (HiveVersion.min(HiveVersion.HIVE_2)) {
-      lockRequest.setAgentInfo(agentInfo);
-    }
-
-    Tasks.foreach(lockRequest)
-        .retry(Integer.MAX_VALUE - 100)
-        .exponentialBackoff(
-            lockCreationMinWaitTime, lockCreationMaxWaitTime, lockCreationTimeout, 2.0)
-        .shouldRetryTest(e -> e instanceof TException && HiveVersion.min(HiveVersion.HIVE_2))
-        .throwFailureWhenFinished()
-        .run(
-            request -> {
-              try {
-                LockResponse lockResponse = metaClients.run(client -> client.lock(request));
-                lockInfo.lockId = lockResponse.getLockid();
-                lockInfo.lockState = lockResponse.getState();
-              } catch (TException te) {
-                LOG.warn("Failed to acquire lock {}", request, te);
-                try {
-                  // If we can not check for lock, or we do not find it, then rethrow the exception
-                  // Otherwise we are happy as the findLock sets the lockId and the state correctly
-                  if (!HiveVersion.min(HiveVersion.HIVE_2)) {
-                    LockInfo lockFound = findLock(agentInfo);
-                    if (lockFound != null) {
-                      lockInfo.lockId = lockFound.lockId;
-                      lockInfo.lockState = lockFound.lockState;
-                      LOG.info("Found lock {} by agentInfo {}", lockInfo, agentInfo);
-                      return;
-                    }
-                  }
-
-                  throw te;
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                  LOG.warn(
-                      "Interrupted while checking for lock on table {}.{}", database, tableName, e);
-                  throw new RuntimeException("Interrupted while checking for lock", e);
-                }
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOG.warn("Interrupted while acquiring lock on table {}.{}", database, tableName, e);
-                throw new RuntimeException("Interrupted while acquiring lock", e);
-              }
-            },
-            TException.class);
-
-    // This should be initialized always, or exception should be thrown.
-    LOG.debug("Lock {} created for table {}.{}", lockInfo, database, tableName);
-    return lockInfo;
-  }
-
-  /**
-   * Search for the locks using HMSClient.showLocks identified by the agentInfo. If the lock is
-   * there, then a {@link LockInfo} object is returned. If the lock is not found <code>null</code>
-   * is returned.
-   *
-   * @param agentInfo The key for searching the locks
-   * @return The {@link LockInfo} for the found lock, or <code>null</code> if nothing found
-   */
-  private LockInfo findLock(String agentInfo) throws TException, InterruptedException {
-    Preconditions.checkArgument(
-        HiveVersion.min(HiveVersion.HIVE_2),
-        "Minimally Hive 2 HMS client is needed to find the Lock using the showLocks API call");
-    ShowLocksRequest showLocksRequest = new ShowLocksRequest();
-    showLocksRequest.setDbname(database);
-    showLocksRequest.setTablename(tableName);
-    ShowLocksResponse response = metaClients.run(client -> client.showLocks(showLocksRequest));
-    for (ShowLocksResponseElement lock : response.getLocks()) {
-      if (lock.getAgentInfo().equals(agentInfo)) {
-        // We found our lock
-        return new LockInfo(lock.getLockid(), lock.getState());
-      }
-    }
-
-    // Not found anything
-    return null;
-  }
-
-  private static class HiveLockHeartbeat implements Runnable {
-    private final ClientPool<IMetaStoreClient, TException> hmsClients;
-    private final long lockId;
-    private final long intervalMs;
-    private ScheduledFuture<?> future;
-    private volatile Exception encounteredException = null;
-
-    HiveLockHeartbeat(
-        ClientPool<IMetaStoreClient, TException> hmsClients, long lockId, long intervalMs) {
-      this.hmsClients = hmsClients;
-      this.lockId = lockId;
-      this.intervalMs = intervalMs;
-      this.future = null;
-    }
-
-    @Override
-    public void run() {
-      try {
-        hmsClients.run(
-            client -> {
-              client.heartbeat(0, lockId);
-              return null;
-            });
-      } catch (TException | InterruptedException e) {
-        this.encounteredException = e;
-        throw new CommitFailedException(e, "Failed to heartbeat for lock: %d", lockId);
-      }
-    }
-
-    public void schedule(ScheduledExecutorService scheduler) {
-      future =
-          scheduler.scheduleAtFixedRate(this, intervalMs / 2, intervalMs, TimeUnit.MILLISECONDS);
-    }
-
-    public void cancel() {
-      if (future != null) {
-        future.cancel(false);
-      }
-    }
-  }
-
-  private static class LockInfo {
-    private long lockId;
-    private LockState lockState;
-
-    private LockInfo() {
-      this.lockId = -1;
-      this.lockState = null;
-    }
-
-    private LockInfo(long lockId, LockState lockState) {
-      this.lockId = lockId;
-      this.lockState = lockState;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("lockId", lockId)
-          .add("lockState", lockState)
-          .toString();
-    }
+  @VisibleForTesting
+  HiveLock lockObject() {
+    return new MetastoreLock(conf, metaClients, catalogName, database, tableName);
   }
 }
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/LockException.java
similarity index 75%
copy from api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
copy to hive-metastore/src/main/java/org/apache/iceberg/hive/LockException.java
index ca9d1e4a5b..0673ff4f06 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/LockException.java
@@ -16,19 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.exceptions;
+package org.apache.iceberg.hive;
 
 import com.google.errorprone.annotations.FormatMethod;
 
-/** Exception raised when a commit fails because of out of date metadata. */
-public class CommitFailedException extends RuntimeException {
+class LockException extends RuntimeException {
   @FormatMethod
-  public CommitFailedException(String message, Object... args) {
+  LockException(String message, Object... args) {
     super(String.format(message, args));
   }
 
   @FormatMethod
-  public CommitFailedException(Throwable cause, String message, Object... args) {
+  LockException(Throwable cause, String message, Object... args) {
     super(String.format(message, args), cause);
   }
 }
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java
new file mode 100644
index 0000000000..5820365339
--- /dev/null
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class MetastoreLock implements HiveLock {
+  private static final Logger LOG = LoggerFactory.getLogger(MetastoreLock.class);
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
+      "iceberg.hive.lock-creation-timeout-ms";
+  private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
+      "iceberg.hive.lock-creation-min-wait-ms";
+  private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
+      "iceberg.hive.lock-creation-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+  private static volatile Cache<String, ReentrantLock> commitLockCache;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final String databaseName;
+  private final String tableName;
+  private final String fullName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockCreationTimeout;
+  private final long lockCreationMinWaitTime;
+  private final long lockCreationMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+  private final String agentInfo;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private ReentrantLock jvmLock = null;
+  private Heartbeat heartbeat = null;
+
+  MetastoreLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockCreationTimeout =
+        conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS, HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
+    this.lockCreationMinWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS, HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
+    this.lockCreationMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS, HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+
+    this.agentInfo = "Iceberg-" + UUID.randomUUID();
+
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-" + fullName + "-%d")
+                .build());
+
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  @Override
+  public void lock() throws LockException {
+    // getting a process-level lock per table to avoid concurrent commit attempts to the same table
+    // from the same JVM process, which would result in unnecessary HMS lock acquisition requests
+    acquireJvmLock();
+
+    // Getting HMS lock
+    hmsLockId = Optional.of(acquireLock());
+
+    // Starting heartbeat for the HMS lock
+    heartbeat = new Heartbeat(metaClients, hmsLockId.get(), lockHeartbeatIntervalTime);
+    heartbeat.schedule(exitingScheduledExecutorService);
+  }
+
+  @Override
+  public void ensureActive() throws LockException {
+    if (heartbeat == null) {
+      throw new LockException("Lock is not active");
+    }
+
+    if (heartbeat.encounteredException != null) {
+      throw new LockException(
+          heartbeat.encounteredException,
+          "Failed to heartbeat for hive lock. %s",
+          heartbeat.encounteredException.getMessage());
+    }
+    if (!heartbeat.active()) {
+      throw new LockException("Hive lock heartbeat thread not active");
+    }
+  }
+
+  @Override
+  public void unlock() {
+    if (heartbeat != null) {
+      heartbeat.cancel();
+      exitingScheduledExecutorService.shutdown();
+    }
+
+    try {
+      unlock(hmsLockId);
+    } finally {
+      releaseJvmLock();
+    }
+  }
+
+  private long acquireLock() throws LockException {
+    LockInfo lockInfo = createLock();
+
+    final long start = System.currentTimeMillis();
+    long duration = 0;
+    boolean timeout = false;
+    TException thriftError = null;
+
+    try {
+      if (lockInfo.lockState.equals(LockState.WAITING)) {
+        // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact,
+        // the maximum number of
+        // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use
+        // timeout as the
+        // upper bound of retries. So it is just reasonable to set a large retry count. However, if
+        // we set
+        // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into
+        // Integer.MIN_VALUE. Hence,
+        // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any
+        // boundary issues.
+        Tasks.foreach(lockInfo.lockId)
+            .retry(Integer.MAX_VALUE - 100)
+            .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, lockAcquireTimeout, 1.5)
+            .throwFailureWhenFinished()
+            .onlyRetryOn(WaitingForLockException.class)
+            .run(
+                id -> {
+                  try {
+                    LockResponse response = metaClients.run(client -> client.checkLock(id));
+                    LockState newState = response.getState();
+                    lockInfo.lockState = newState;
+                    if (newState.equals(LockState.WAITING)) {
+                      throw new WaitingForLockException(
+                          String.format(
+                              "Waiting for lock on table %s.%s", databaseName, tableName));
+                    }
+                  } catch (InterruptedException e) {
+                    Thread.interrupted(); // Clear the interrupt status flag
+                    LOG.warn(
+                        "Interrupted while waiting for lock on table {}.{}",
+                        databaseName,
+                        tableName,
+                        e);
+                  }
+                },
+                TException.class);
+      }
+    } catch (WaitingForLockException e) {
+      timeout = true;
+      duration = System.currentTimeMillis() - start;
+    } catch (TException e) {
+      thriftError = e;
+    } finally {
+      if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
+        unlock(Optional.of(lockInfo.lockId));
+      }
+    }
+
+    if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
+      if (timeout) {
+        throw new LockException(
+            "Timed out after %s ms waiting for lock on %s.%s", duration, databaseName, tableName);
+      }
+
+      if (thriftError != null) {
+        throw new LockException(
+            thriftError, "Metastore operation failed for %s.%s", databaseName, tableName);
+      }
+
+      // Just for safety. We should not get here.
+      throw new LockException(
+          "Could not acquire the lock on %s.%s, lock request ended in state %s",
+          databaseName, tableName, lockInfo.lockState);
+    } else {
+      return lockInfo.lockId;
+    }
+  }
+
+  /**
+   * Creates a lock, retrying if possible on failure.
+   *
+   * @return The {@link LockInfo} object for the successfully created lock
+   * @throws LockException When we are not able to fill the hostname for lock creation, or there is
+   *     an error during lock creation
+   */
+  @SuppressWarnings("ReverseDnsLookup")
+  private LockInfo createLock() throws LockException {
+    LockInfo lockInfo = new LockInfo();
+
+    String hostName;
+    try {
+      hostName = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException uhe) {
+      throw new LockException(uhe, "Error generating host name");
+    }
+
+    LockComponent lockComponent =
+        new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, databaseName);
+    lockComponent.setTablename(tableName);
+    LockRequest lockRequest =
+        new LockRequest(Lists.newArrayList(lockComponent), HiveHadoopUtil.currentUser(), hostName);
+
+    // Only works in Hive 2 or later.
+    if (HiveVersion.min(HiveVersion.HIVE_2)) {
+      lockRequest.setAgentInfo(agentInfo);
+    }
+
+    AtomicBoolean interrupted = new AtomicBoolean(false);
+    Tasks.foreach(lockRequest)
+        .retry(Integer.MAX_VALUE - 100)
+        .exponentialBackoff(
+            lockCreationMinWaitTime, lockCreationMaxWaitTime, lockCreationTimeout, 2.0)
+        .shouldRetryTest(
+            e ->
+                !interrupted.get()
+                    && e instanceof LockException
+                    && HiveVersion.min(HiveVersion.HIVE_2))
+        .throwFailureWhenFinished()
+        .run(
+            request -> {
+              try {
+                LockResponse lockResponse = metaClients.run(client -> client.lock(request));
+                lockInfo.lockId = lockResponse.getLockid();
+                lockInfo.lockState = lockResponse.getState();
+              } catch (TException te) {
+                LOG.warn("Failed to create lock {}", request, te);
+                try {
+                  // If we can not check for lock, or we do not find it, then rethrow the exception
+                  // Otherwise we are happy as the findLock sets the lockId and the state correctly
+                  if (!HiveVersion.min(HiveVersion.HIVE_2)) {
+                    LockInfo lockFound = findLock();
+                    if (lockFound != null) {
+                      lockInfo.lockId = lockFound.lockId;
+                      lockInfo.lockState = lockFound.lockState;
+                      LOG.info("Found lock {} by agentInfo {}", lockInfo, agentInfo);
+                      return;
+                    }
+                  }
+
+                  throw new LockException(
+                      "Failed to find lock for table %s.%s", databaseName, tableName);
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  interrupted.set(true);
+                  LOG.warn(
+                      "Interrupted while trying to find lock for table {}.{}",
+                      databaseName,
+                      tableName,
+                      e);
+                  throw new LockException(
+                      e,
+                      "Interrupted while trying to find lock for table %s.%s",
+                      databaseName,
+                      tableName);
+                }
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                interrupted.set(true);
+                LOG.warn(
+                    "Interrupted while creating lock on table {}.{}", databaseName, tableName, e);
+                throw new LockException(
+                    e, "Interrupted while creating lock on table %s.%s", databaseName, tableName);
+              }
+            },
+            LockException.class);
+
+    // This should be initialized always, or exception should be thrown.
+    LOG.debug("Lock {} created for table {}.{}", lockInfo, databaseName, tableName);
+    return lockInfo;
+  }
+
+  /**
+   * Search for the locks using HMSClient.showLocks identified by the agentInfo. If the lock is
+   * there, then a {@link LockInfo} object is returned. If the lock is not found <code>null</code>
+   * is returned.
+   *
+   * @return The {@link LockInfo} for the found lock, or <code>null</code> if nothing found
+   */
+  private LockInfo findLock() throws LockException, InterruptedException {
+    Preconditions.checkArgument(
+        HiveVersion.min(HiveVersion.HIVE_2),
+        "Minimally Hive 2 HMS client is needed to find the Lock using the showLocks API call");
+    ShowLocksRequest showLocksRequest = new ShowLocksRequest();
+    showLocksRequest.setDbname(databaseName);
+    showLocksRequest.setTablename(tableName);
+    ShowLocksResponse response;
+    try {
+      response = metaClients.run(client -> client.showLocks(showLocksRequest));
+    } catch (TException e) {
+      throw new LockException(e, "Failed to find lock for table %s.%s", databaseName, tableName);
+    }
+    for (ShowLocksResponseElement lock : response.getLocks()) {
+      if (lock.getAgentInfo().equals(agentInfo)) {
+        // We found our lock
+        return new LockInfo(lock.getLockid(), lock.getState());
+      }
+    }
+
+    // Not found anything
+    return null;
+  }
+
+  private void unlock(Optional<Long> lockId) {
+    Long id = null;
+    try {
+      if (!lockId.isPresent()) {
+        // Try to find the lock based on agentInfo. Only works with Hive 2 or later.
+        if (HiveVersion.min(HiveVersion.HIVE_2)) {
+          LockInfo lockInfo = findLock();
+          if (lockInfo == null) {
+            // No lock found
+            LOG.info("No lock found with {} agentInfo", agentInfo);
+            return;
+          }
+
+          id = lockInfo.lockId;
+        } else {
+          LOG.warn("Could not find lock with HMSClient {}", HiveVersion.current());
+          return;
+        }
+      } else {
+        id = lockId.get();
+      }
+
+      doUnlock(id);
+    } catch (InterruptedException ie) {
+      if (id != null) {
+        // Interrupted unlock. We try to unlock one more time if we have a lockId
+        try {
+          Thread.interrupted(); // Clear the interrupt status flag for now, so we can retry unlock
+          LOG.warn("Interrupted unlock we try one more time {}.{}", databaseName, tableName, ie);
+          doUnlock(id);
+        } catch (Exception e) {
+          LOG.warn("Failed to unlock even on 2nd attempt {}.{}", databaseName, tableName, e);
+        } finally {
+          Thread.currentThread().interrupt(); // Set back the interrupt status
+        }
+      } else {
+        Thread.currentThread().interrupt(); // Set back the interrupt status
+        LOG.warn("Interrupted finding locks to unlock {}.{}", databaseName, tableName, ie);
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to unlock {}.{}", databaseName, tableName, e);
+    }
+  }
+
+  private void doUnlock(long lockId) throws TException, InterruptedException {
+    metaClients.run(
+        client -> {
+          client.unlock(lockId);
+          return null;
+        });
+  }
+
+  private void acquireJvmLock() {
+    if (jvmLock != null) {
+      throw new IllegalStateException(
+          String.format("Cannot call acquireLock twice for %s", fullName));
+    }
+
+    jvmLock = commitLockCache.get(fullName, t -> new ReentrantLock(true));
+    jvmLock.lock();
+  }
+
+  private void releaseJvmLock() {
+    if (jvmLock != null) {
+      jvmLock.unlock();
+      jvmLock = null;
+    }
+  }
+
+  private static void initTableLevelLockCache(long evictionTimeout) {
+    if (commitLockCache == null) {
+      synchronized (MetastoreLock.class) {
+        if (commitLockCache == null) {
+          commitLockCache =
+              Caffeine.newBuilder()
+                  .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+                  .build();
+        }
+      }
+    }
+  }
+
+  private static class Heartbeat implements Runnable {
+    private final ClientPool<IMetaStoreClient, TException> hmsClients;
+    private final long lockId;
+    private final long intervalMs;
+    private ScheduledFuture<?> future;
+    private volatile Exception encounteredException = null;
+
+    Heartbeat(ClientPool<IMetaStoreClient, TException> hmsClients, long lockId, long intervalMs) {
+      this.hmsClients = hmsClients;
+      this.lockId = lockId;
+      this.intervalMs = intervalMs;
+      this.future = null;
+    }
+
+    @Override
+    public void run() {
+      try {
+        hmsClients.run(
+            client -> {
+              client.heartbeat(0, lockId);
+              return null;
+            });
+      } catch (TException | InterruptedException e) {
+        this.encounteredException = e;
+        throw new CommitFailedException(e, "Failed to heartbeat for lock: %d", lockId);
+      }
+    }
+
+    public void schedule(ScheduledExecutorService scheduler) {
+      future =
+          scheduler.scheduleAtFixedRate(this, intervalMs / 2, intervalMs, TimeUnit.MILLISECONDS);
+    }
+
+    boolean active() {
+      return future != null && !future.isCancelled();
+    }
+
+    public void cancel() {
+      if (future != null) {
+        future.cancel(false);
+      }
+    }
+  }
+
+  private static class LockInfo {
+    private long lockId;
+    private LockState lockState;
+
+    private LockInfo() {
+      this.lockId = -1;
+      this.lockState = null;
+    }
+
+    private LockInfo(long lockId, LockState lockState) {
+      this.lockId = lockId;
+      this.lockState = lockState;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("lockId", lockId)
+          .add("lockState", lockState)
+          .toString();
+    }
+  }
+
+  private static class WaitingForLockException extends RuntimeException {
+    WaitingForLockException(String message) {
+      super(message);
+    }
+  }
+}
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 8b439047ca..d64653301e 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -157,7 +157,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
   @Test
   public void testLockAcquisitionAtFirstTime() throws TException, InterruptedException {
     doReturn(acquiredLockResponse).when(spyClient).lock(any());
-    doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+    doNothing().when(spyClient).unlock(eq(dummyLockId));
     doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
 
     spyOps.doCommit(metadataV2, metadataV1);
@@ -176,7 +176,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
         .doReturn(acquiredLockResponse)
         .when(spyClient)
         .checkLock(eq(dummyLockId));
-    doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+    doNothing().when(spyClient).unlock(eq(dummyLockId));
     doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
 
     spyOps.doCommit(metadataV2, metadataV1);
@@ -195,7 +195,6 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
         .doReturn(acquiredLockResponse)
         .when(spyClient)
         .checkLock(eq(dummyLockId));
-    doNothing().when(spyOps).doUnlock(eq(dummyLockId));
     doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
 
     spyOps.doCommit(metadataV2, metadataV1);
@@ -220,7 +219,6 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
 
     doReturn(showLocksResponse).when(spyClient).showLocks(any());
     doReturn(acquiredLockResponse).when(spyClient).checkLock(eq(dummyLockId));
-    doNothing().when(spyOps).doUnlock(eq(dummyLockId));
     doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
 
     spyOps.doCommit(metadataV2, metadataV1);
@@ -282,7 +280,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
     AssertHelpers.assertThrows(
         "Expected an exception",
         RuntimeException.class,
-        "Interrupted while acquiring lock",
+        "Interrupted while creating lock",
         () -> spyOps.doCommit(metadataV2, metadataV1));
 
     verify(spyClient, times(1)).unlock(eq(dummyLockId));
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
index 869cb9fe44..b9799cefe6 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
@@ -27,7 +27,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.PartitionSpec;
@@ -41,7 +41,6 @@ import org.apache.iceberg.types.Types;
 import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 
 public class TestHiveCommits extends HiveTableBaseTest {
 
@@ -62,13 +61,22 @@ public class TestHiveCommits extends HiveTableBaseTest {
 
     HiveTableOperations spyOps = spy(ops);
 
-    ArgumentCaptor<Long> lockId = ArgumentCaptor.forClass(Long.class);
-    doThrow(new RuntimeException()).when(spyOps).doUnlock(lockId.capture());
+    AtomicReference<HiveLock> lockRef = new AtomicReference<>();
+
+    when(spyOps.lockObject())
+        .thenAnswer(
+            i -> {
+              HiveLock lock = (HiveLock) i.callRealMethod();
+              lockRef.set(lock);
+              return lock;
+            });
 
     try {
       spyOps.commit(metadataV2, metadataV1);
+      HiveLock spyLock = spy(lockRef.get());
+      doThrow(new RuntimeException()).when(spyLock).unlock();
     } finally {
-      ops.doUnlock(lockId.getValue());
+      lockRef.get().unlock();
     }
 
     ops.refresh();
@@ -268,16 +276,16 @@ public class TestHiveCommits extends HiveTableBaseTest {
 
     HiveTableOperations spyOps = spy(ops);
 
-    AtomicLong lockId = new AtomicLong();
+    AtomicReference<HiveLock> lock = new AtomicReference<>();
     doAnswer(
-            i -> {
-              lockId.set(ops.acquireLock("agentInfo"));
-              return lockId.get();
+            l -> {
+              lock.set(ops.lockObject());
+              return lock.get();
             })
         .when(spyOps)
-        .acquireLock(any());
+        .lockObject();
 
-    concurrentCommitAndThrowException(ops, spyOps, table, lockId);
+    concurrentCommitAndThrowException(ops, spyOps, table, lock);
 
     /*
     This commit and our concurrent commit should succeed even though this commit throws an exception
@@ -331,7 +339,7 @@ public class TestHiveCommits extends HiveTableBaseTest {
       HiveTableOperations realOperations,
       HiveTableOperations spyOperations,
       Table table,
-      AtomicLong lockId)
+      AtomicReference<HiveLock> lock)
       throws TException, InterruptedException {
     // Simulate a communication error after a successful commit
     doAnswer(
@@ -340,7 +348,7 @@ public class TestHiveCommits extends HiveTableBaseTest {
                   i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class);
               realOperations.persistTable(tbl, true);
               // Simulate lock expiration or removal
-              realOperations.doUnlock(lockId.get());
+              lock.get().unlock();
               table.refresh();
               table.updateSchema().addColumn("newCol", Types.IntegerType.get()).commit();
               throw new TException("Datacenter on fire");