You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/10/11 01:06:54 UTC

[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5877: Refactor commit lock mechanism from HiveTableOperations

szehon-ho commented on code in PR #5877:
URL: https://github.com/apache/iceberg/pull/5877#discussion_r991716721


##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveCommitLock {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveCommitLock.class);
+
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private static synchronized void initTableLevelLockCache(long evictionTimeout) {
+    if (commitLockCache == null) {
+      commitLockCache =
+          Caffeine.newBuilder().expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS).build();
+    }
+  }
+
+  private static class WaitingForLockException extends RuntimeException {
+    WaitingForLockException(String message) {
+      super(message);
+    }
+  }
+
+  private final String fullName;
+  private final String databaseName;
+  private final String tableName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private Optional<ReentrantLock> jvmLock = Optional.empty();
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public HiveCommitLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-%d")
+                .build());
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  public void acquire() throws UnknownHostException, TException, InterruptedException {
+    // getting a process-level lock per table to avoid concurrent commit attempts to the same table
+    // from the same JVM process, which would result in unnecessary and costly HMS lock acquisition
+    // requests
+    acquireJvmLock();
+    acquireLockFromHms();
+  }
+
+  public void release() {
+    releaseHmsLock();
+    releaseJvmLock();
+  }
+
+  public boolean isHeartbeatInProgress() {
+    return hiveLockHeartbeat != null
+        && hiveLockHeartbeat.future != null
+        && !hiveLockHeartbeat.future.isCancelled()
+        && hiveLockHeartbeat.encounteredException == null;
+  }
+
+  private void acquireLockFromHms() throws UnknownHostException, TException, InterruptedException {

Review Comment:
   Nit: should method be named acquireHmsLock to be consistent with release?



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveCommitLock {

Review Comment:
   If we do, we may also make it clear in the doc that unlike standard Lock, this is not multi thread safe? (ie, it is not safe to call acquire lock from multiple threads)   If my observation is right, I guess its worth documenting in any case



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveCommitLock {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveCommitLock.class);
+
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private static synchronized void initTableLevelLockCache(long evictionTimeout) {
+    if (commitLockCache == null) {
+      commitLockCache =
+          Caffeine.newBuilder().expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS).build();
+    }
+  }
+
+  private static class WaitingForLockException extends RuntimeException {
+    WaitingForLockException(String message) {
+      super(message);
+    }
+  }
+
+  private final String fullName;
+  private final String databaseName;
+  private final String tableName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private Optional<ReentrantLock> jvmLock = Optional.empty();
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public HiveCommitLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-%d")
+                .build());
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  public void acquire() throws UnknownHostException, TException, InterruptedException {
+    // getting a process-level lock per table to avoid concurrent commit attempts to the same table
+    // from the same JVM process, which would result in unnecessary and costly HMS lock acquisition
+    // requests
+    acquireJvmLock();
+    acquireLockFromHms();
+  }
+
+  public void release() {
+    releaseHmsLock();
+    releaseJvmLock();
+  }
+
+  public boolean isHeartbeatInProgress() {
+    return hiveLockHeartbeat != null
+        && hiveLockHeartbeat.future != null
+        && !hiveLockHeartbeat.future.isCancelled()
+        && hiveLockHeartbeat.encounteredException == null;
+  }
+
+  private void acquireLockFromHms() throws UnknownHostException, TException, InterruptedException {
+    if (hmsLockId.isPresent()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "HMS lock ID=%s already acquired for table %s.%s",
+              hmsLockId.get(), databaseName, tableName));
+    }
+    final LockComponent lockComponent =
+        new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, databaseName);
+    lockComponent.setTablename(tableName);
+    final LockRequest lockRequest =
+        new LockRequest(
+            Lists.newArrayList(lockComponent),
+            System.getProperty("user.name"),
+            InetAddress.getLocalHost().getHostName());
+    LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
+    AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState());
+    long lockId = lockResponse.getLockid();
+    this.hmsLockId = Optional.of(lockId);
+
+    final long start = System.currentTimeMillis();
+    long duration = 0;
+    boolean timeout = false;
+
+    try {
+      if (state.get().equals(LockState.WAITING)) {
+        // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact,
+        // the maximum number of
+        // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use
+        // timeout as the
+        // upper bound of retries. So it is just reasonable to set a large retry count. However, if
+        // we set
+        // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into
+        // Integer.MIN_VALUE. Hence,
+        // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any
+        // boundary issues.
+        Tasks.foreach(lockId)
+            .retry(Integer.MAX_VALUE - 100)
+            .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, lockAcquireTimeout, 1.5)
+            .throwFailureWhenFinished()
+            .onlyRetryOn(WaitingForLockException.class)
+            .run(
+                id -> {
+                  try {
+                    LockResponse response = metaClients.run(client -> client.checkLock(id));
+                    LockState newState = response.getState();
+                    state.set(newState);
+                    if (newState.equals(LockState.WAITING)) {
+                      throw new WaitingForLockException(
+                          String.format(
+                              "Waiting for lock on table %s.%s", databaseName, tableName));
+                    }
+                  } catch (InterruptedException e) {
+                    Thread.interrupted(); // Clear the interrupt status flag

Review Comment:
   Again I see its copy and paste, but its weird, it seems this is checking a boolean and ignoring it.



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveCommitLock {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveCommitLock.class);
+
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private static synchronized void initTableLevelLockCache(long evictionTimeout) {
+    if (commitLockCache == null) {
+      commitLockCache =
+          Caffeine.newBuilder().expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS).build();
+    }
+  }
+
+  private static class WaitingForLockException extends RuntimeException {
+    WaitingForLockException(String message) {
+      super(message);
+    }
+  }
+
+  private final String fullName;
+  private final String databaseName;
+  private final String tableName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+
+  private Optional<Long> hmsLockId = Optional.empty();

Review Comment:
   I am wondering, is there justification to make these Optional versus just null checks?
   
   There are some inteillij warnings because we are using this.  Looking into it, there are some who say that Java Optional is not meant for a general purpose Maybe type:  https://stackoverflow.com/questions/26327957/should-java-8-getters-return-optional-type/26328555#26328555  hence the intellij warnings.   It seems its meant more for return types of methods to indicate that its possible something is null.  Of course its more of a debate, but just noticing we dont use a whole lot of Optional elsewhere and may be worth to keep it simple.
   
   I do realize that originally lockId is optional, but here it adds another optional (jvmLock).  Wdyt?
   
   



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -316,22 +243,16 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
       }
 
       try {
-        if (hiveLockHeartbeat.future.isCancelled()
-            || hiveLockHeartbeat.encounteredException != null) {
-          throw new CommitFailedException(
-              "Failed to heartbeat for hive lock. %s",
-              hiveLockHeartbeat.encounteredException.getMessage());
+        if (!commitLock.isHeartbeatInProgress()) {
+          throw new CommitFailedException("Failed to heartbeat for hive lock.");

Review Comment:
   I feel here we may be losing some information in the stack?   in the case that encounteredException is not null?  



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveCommitLock {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveCommitLock.class);
+
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private static synchronized void initTableLevelLockCache(long evictionTimeout) {
+    if (commitLockCache == null) {
+      commitLockCache =
+          Caffeine.newBuilder().expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS).build();
+    }
+  }
+
+  private static class WaitingForLockException extends RuntimeException {
+    WaitingForLockException(String message) {
+      super(message);
+    }
+  }
+
+  private final String fullName;
+  private final String databaseName;
+  private final String tableName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private Optional<ReentrantLock> jvmLock = Optional.empty();
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public HiveCommitLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-%d")
+                .build());
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  public void acquire() throws UnknownHostException, TException, InterruptedException {
+    // getting a process-level lock per table to avoid concurrent commit attempts to the same table
+    // from the same JVM process, which would result in unnecessary and costly HMS lock acquisition
+    // requests
+    acquireJvmLock();
+    acquireLockFromHms();
+  }
+
+  public void release() {
+    releaseHmsLock();
+    releaseJvmLock();
+  }
+
+  public boolean isHeartbeatInProgress() {
+    return hiveLockHeartbeat != null
+        && hiveLockHeartbeat.future != null
+        && !hiveLockHeartbeat.future.isCancelled()
+        && hiveLockHeartbeat.encounteredException == null;
+  }
+
+  private void acquireLockFromHms() throws UnknownHostException, TException, InterruptedException {
+    if (hmsLockId.isPresent()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "HMS lock ID=%s already acquired for table %s.%s",
+              hmsLockId.get(), databaseName, tableName));
+    }
+    final LockComponent lockComponent =
+        new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, databaseName);
+    lockComponent.setTablename(tableName);
+    final LockRequest lockRequest =
+        new LockRequest(
+            Lists.newArrayList(lockComponent),
+            System.getProperty("user.name"),
+            InetAddress.getLocalHost().getHostName());
+    LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
+    AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState());
+    long lockId = lockResponse.getLockid();
+    this.hmsLockId = Optional.of(lockId);
+
+    final long start = System.currentTimeMillis();
+    long duration = 0;
+    boolean timeout = false;
+
+    try {
+      if (state.get().equals(LockState.WAITING)) {
+        // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact,
+        // the maximum number of
+        // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use
+        // timeout as the
+        // upper bound of retries. So it is just reasonable to set a large retry count. However, if
+        // we set
+        // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into
+        // Integer.MIN_VALUE. Hence,
+        // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any
+        // boundary issues.
+        Tasks.foreach(lockId)
+            .retry(Integer.MAX_VALUE - 100)
+            .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, lockAcquireTimeout, 1.5)
+            .throwFailureWhenFinished()
+            .onlyRetryOn(WaitingForLockException.class)
+            .run(
+                id -> {
+                  try {
+                    LockResponse response = metaClients.run(client -> client.checkLock(id));
+                    LockState newState = response.getState();
+                    state.set(newState);
+                    if (newState.equals(LockState.WAITING)) {
+                      throw new WaitingForLockException(
+                          String.format(
+                              "Waiting for lock on table %s.%s", databaseName, tableName));
+                    }
+                  } catch (InterruptedException e) {
+                    Thread.interrupted(); // Clear the interrupt status flag
+                    LOG.warn(
+                        "Interrupted while waiting for lock on table {}.{}",
+                        databaseName,
+                        tableName,
+                        e);
+                  }
+                },
+                TException.class);
+      }
+    } catch (WaitingForLockException waitingForLockException) {
+      timeout = true;
+      duration = System.currentTimeMillis() - start;
+    } finally {
+      if (!state.get().equals(LockState.ACQUIRED)) {
+        releaseHmsLock();
+      }
+    }
+
+    // timeout and do not have lock acquired
+    if (timeout && !state.get().equals(LockState.ACQUIRED)) {
+      throw new CommitFailedException(
+          "Timed out after %s ms waiting for lock on %s.%s", duration, databaseName, tableName);
+    }
+
+    if (!state.get().equals(LockState.ACQUIRED)) {
+      throw new CommitFailedException(
+          "Could not acquire the lock on %s.%s, " + "lock request ended in state %s",
+          databaseName, tableName, state);
+    }
+
+    hiveLockHeartbeat = new HiveLockHeartbeat(metaClients, lockId, lockHeartbeatIntervalTime);
+    hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
+  }
+
+  private void releaseHmsLock() {
+    if (hmsLockId.isPresent()) {

Review Comment:
   If we have a simple boolean acquired (as mentioned in my other comment), we can just replace these theese two null checks with a single check on release().  As these are both private methods, checking both seems excessive?



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveCommitLock {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveCommitLock.class);
+
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private static synchronized void initTableLevelLockCache(long evictionTimeout) {
+    if (commitLockCache == null) {
+      commitLockCache =
+          Caffeine.newBuilder().expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS).build();
+    }
+  }
+
+  private static class WaitingForLockException extends RuntimeException {
+    WaitingForLockException(String message) {
+      super(message);
+    }
+  }
+
+  private final String fullName;
+  private final String databaseName;
+  private final String tableName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private Optional<ReentrantLock> jvmLock = Optional.empty();
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public HiveCommitLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-%d")
+                .build());
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  public void acquire() throws UnknownHostException, TException, InterruptedException {
+    // getting a process-level lock per table to avoid concurrent commit attempts to the same table
+    // from the same JVM process, which would result in unnecessary and costly HMS lock acquisition
+    // requests
+    acquireJvmLock();
+    acquireLockFromHms();
+  }
+
+  public void release() {
+    releaseHmsLock();
+    releaseJvmLock();
+  }
+
+  public boolean isHeartbeatInProgress() {
+    return hiveLockHeartbeat != null
+        && hiveLockHeartbeat.future != null
+        && !hiveLockHeartbeat.future.isCancelled()
+        && hiveLockHeartbeat.encounteredException == null;
+  }
+
+  private void acquireLockFromHms() throws UnknownHostException, TException, InterruptedException {
+    if (hmsLockId.isPresent()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "HMS lock ID=%s already acquired for table %s.%s",
+              hmsLockId.get(), databaseName, tableName));
+    }
+    final LockComponent lockComponent =
+        new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, databaseName);
+    lockComponent.setTablename(tableName);
+    final LockRequest lockRequest =
+        new LockRequest(
+            Lists.newArrayList(lockComponent),
+            System.getProperty("user.name"),
+            InetAddress.getLocalHost().getHostName());
+    LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
+    AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState());
+    long lockId = lockResponse.getLockid();
+    this.hmsLockId = Optional.of(lockId);
+
+    final long start = System.currentTimeMillis();
+    long duration = 0;
+    boolean timeout = false;
+
+    try {
+      if (state.get().equals(LockState.WAITING)) {
+        // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact,
+        // the maximum number of
+        // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use
+        // timeout as the
+        // upper bound of retries. So it is just reasonable to set a large retry count. However, if
+        // we set
+        // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into
+        // Integer.MIN_VALUE. Hence,
+        // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any
+        // boundary issues.
+        Tasks.foreach(lockId)
+            .retry(Integer.MAX_VALUE - 100)
+            .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, lockAcquireTimeout, 1.5)
+            .throwFailureWhenFinished()
+            .onlyRetryOn(WaitingForLockException.class)
+            .run(
+                id -> {
+                  try {
+                    LockResponse response = metaClients.run(client -> client.checkLock(id));
+                    LockState newState = response.getState();
+                    state.set(newState);
+                    if (newState.equals(LockState.WAITING)) {
+                      throw new WaitingForLockException(
+                          String.format(
+                              "Waiting for lock on table %s.%s", databaseName, tableName));
+                    }
+                  } catch (InterruptedException e) {
+                    Thread.interrupted(); // Clear the interrupt status flag
+                    LOG.warn(
+                        "Interrupted while waiting for lock on table {}.{}",
+                        databaseName,
+                        tableName,
+                        e);
+                  }
+                },
+                TException.class);
+      }
+    } catch (WaitingForLockException waitingForLockException) {
+      timeout = true;
+      duration = System.currentTimeMillis() - start;
+    } finally {
+      if (!state.get().equals(LockState.ACQUIRED)) {
+        releaseHmsLock();
+      }
+    }
+
+    // timeout and do not have lock acquired
+    if (timeout && !state.get().equals(LockState.ACQUIRED)) {
+      throw new CommitFailedException(
+          "Timed out after %s ms waiting for lock on %s.%s", duration, databaseName, tableName);
+    }
+
+    if (!state.get().equals(LockState.ACQUIRED)) {
+      throw new CommitFailedException(
+          "Could not acquire the lock on %s.%s, " + "lock request ended in state %s",
+          databaseName, tableName, state);
+    }
+
+    hiveLockHeartbeat = new HiveLockHeartbeat(metaClients, lockId, lockHeartbeatIntervalTime);
+    hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
+  }
+
+  private void releaseHmsLock() {
+    if (hmsLockId.isPresent()) {
+      try {
+        metaClients.run(
+            client -> {
+              client.unlock(hmsLockId.get());
+              return null;
+            });
+        hmsLockId = Optional.empty();
+      } catch (Exception e) {
+        LOG.warn("Failed to unlock {}.{}", databaseName, tableName, e);
+      }
+    }
+    if (hiveLockHeartbeat != null) {
+      hiveLockHeartbeat.cancel();
+    }
+  }
+
+  private void acquireJvmLock() {
+    if (jvmLock.isPresent()) {

Review Comment:
   Im just wondering, as we check both (jvmLock and lockId) for null, it seems it will protect only the case where single caller calls acquire twice?  It wont protect from multi thread calls as these are only optional and not atomic, and method acquire is not synchronized.  Would a simple boolean suffice then to avoid two null checks?



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveCommitLock {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveCommitLock.class);
+
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private static synchronized void initTableLevelLockCache(long evictionTimeout) {
+    if (commitLockCache == null) {
+      commitLockCache =
+          Caffeine.newBuilder().expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS).build();
+    }
+  }
+
+  private static class WaitingForLockException extends RuntimeException {
+    WaitingForLockException(String message) {
+      super(message);
+    }
+  }
+
+  private final String fullName;
+  private final String databaseName;
+  private final String tableName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private Optional<ReentrantLock> jvmLock = Optional.empty();
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public HiveCommitLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-%d")
+                .build());
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  public void acquire() throws UnknownHostException, TException, InterruptedException {
+    // getting a process-level lock per table to avoid concurrent commit attempts to the same table
+    // from the same JVM process, which would result in unnecessary and costly HMS lock acquisition
+    // requests
+    acquireJvmLock();
+    acquireLockFromHms();
+  }
+
+  public void release() {
+    releaseHmsLock();
+    releaseJvmLock();
+  }
+
+  public boolean isHeartbeatInProgress() {
+    return hiveLockHeartbeat != null
+        && hiveLockHeartbeat.future != null
+        && !hiveLockHeartbeat.future.isCancelled()
+        && hiveLockHeartbeat.encounteredException == null;
+  }
+
+  private void acquireLockFromHms() throws UnknownHostException, TException, InterruptedException {
+    if (hmsLockId.isPresent()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "HMS lock ID=%s already acquired for table %s.%s",
+              hmsLockId.get(), databaseName, tableName));
+    }
+    final LockComponent lockComponent =
+        new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, databaseName);
+    lockComponent.setTablename(tableName);
+    final LockRequest lockRequest =
+        new LockRequest(
+            Lists.newArrayList(lockComponent),
+            System.getProperty("user.name"),
+            InetAddress.getLocalHost().getHostName());
+    LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
+    AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState());
+    long lockId = lockResponse.getLockid();
+    this.hmsLockId = Optional.of(lockId);
+
+    final long start = System.currentTimeMillis();
+    long duration = 0;
+    boolean timeout = false;
+
+    try {
+      if (state.get().equals(LockState.WAITING)) {
+        // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact,
+        // the maximum number of
+        // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use
+        // timeout as the
+        // upper bound of retries. So it is just reasonable to set a large retry count. However, if
+        // we set
+        // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into
+        // Integer.MIN_VALUE. Hence,
+        // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any
+        // boundary issues.
+        Tasks.foreach(lockId)
+            .retry(Integer.MAX_VALUE - 100)
+            .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, lockAcquireTimeout, 1.5)
+            .throwFailureWhenFinished()
+            .onlyRetryOn(WaitingForLockException.class)
+            .run(
+                id -> {
+                  try {
+                    LockResponse response = metaClients.run(client -> client.checkLock(id));
+                    LockState newState = response.getState();
+                    state.set(newState);
+                    if (newState.equals(LockState.WAITING)) {
+                      throw new WaitingForLockException(
+                          String.format(
+                              "Waiting for lock on table %s.%s", databaseName, tableName));
+                    }
+                  } catch (InterruptedException e) {
+                    Thread.interrupted(); // Clear the interrupt status flag
+                    LOG.warn(
+                        "Interrupted while waiting for lock on table {}.{}",
+                        databaseName,
+                        tableName,
+                        e);
+                  }
+                },
+                TException.class);
+      }
+    } catch (WaitingForLockException waitingForLockException) {
+      timeout = true;
+      duration = System.currentTimeMillis() - start;
+    } finally {
+      if (!state.get().equals(LockState.ACQUIRED)) {
+        releaseHmsLock();
+      }
+    }
+
+    // timeout and do not have lock acquired
+    if (timeout && !state.get().equals(LockState.ACQUIRED)) {
+      throw new CommitFailedException(
+          "Timed out after %s ms waiting for lock on %s.%s", duration, databaseName, tableName);
+    }
+
+    if (!state.get().equals(LockState.ACQUIRED)) {
+      throw new CommitFailedException(
+          "Could not acquire the lock on %s.%s, " + "lock request ended in state %s",
+          databaseName, tableName, state);
+    }
+
+    hiveLockHeartbeat = new HiveLockHeartbeat(metaClients, lockId, lockHeartbeatIntervalTime);
+    hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
+  }
+
+  private void releaseHmsLock() {
+    if (hmsLockId.isPresent()) {
+      try {
+        metaClients.run(
+            client -> {
+              client.unlock(hmsLockId.get());
+              return null;
+            });
+        hmsLockId = Optional.empty();
+      } catch (Exception e) {
+        LOG.warn("Failed to unlock {}.{}", databaseName, tableName, e);
+      }
+    }
+    if (hiveLockHeartbeat != null) {

Review Comment:
   In original code, cancelling seemed to be run before unlock, should we keep it that way?



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveCommitLock {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveCommitLock.class);
+
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private static synchronized void initTableLevelLockCache(long evictionTimeout) {
+    if (commitLockCache == null) {
+      commitLockCache =
+          Caffeine.newBuilder().expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS).build();
+    }
+  }
+
+  private static class WaitingForLockException extends RuntimeException {
+    WaitingForLockException(String message) {
+      super(message);
+    }
+  }
+
+  private final String fullName;
+  private final String databaseName;
+  private final String tableName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private Optional<ReentrantLock> jvmLock = Optional.empty();
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public HiveCommitLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-%d")
+                .build());
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  public void acquire() throws UnknownHostException, TException, InterruptedException {
+    // getting a process-level lock per table to avoid concurrent commit attempts to the same table
+    // from the same JVM process, which would result in unnecessary and costly HMS lock acquisition
+    // requests
+    acquireJvmLock();
+    acquireLockFromHms();
+  }
+
+  public void release() {
+    releaseHmsLock();
+    releaseJvmLock();
+  }
+
+  public boolean isHeartbeatInProgress() {
+    return hiveLockHeartbeat != null
+        && hiveLockHeartbeat.future != null
+        && !hiveLockHeartbeat.future.isCancelled()
+        && hiveLockHeartbeat.encounteredException == null;
+  }
+
+  private void acquireLockFromHms() throws UnknownHostException, TException, InterruptedException {
+    if (hmsLockId.isPresent()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "HMS lock ID=%s already acquired for table %s.%s",
+              hmsLockId.get(), databaseName, tableName));
+    }
+    final LockComponent lockComponent =

Review Comment:
   I do realize its copy and paste, but I know from @aokolnychyi 's comments , we dont tend to use final in inline variable as there's little signfiicance.  Could we remove them here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org