You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2022/08/04 07:46:35 UTC

[iceberg] branch master updated: Hive: Fix concurrent transactions overwriting commits by adding hive lock heartbeats. (#5036)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 28b5d0ec81 Hive: Fix concurrent transactions overwriting commits by adding hive lock heartbeats. (#5036)
28b5d0ec81 is described below

commit 28b5d0ec81ea615d1d644c2d1c0d0667c4323639
Author: Ashish Singh <as...@pinterest.com>
AuthorDate: Thu Aug 4 00:46:29 2022 -0700

    Hive: Fix concurrent transactions overwriting commits by adding hive lock heartbeats. (#5036)
---
 .../exceptions/CommitStateUnknownException.java    |  4 +
 .../apache/iceberg/hive/HiveTableOperations.java   | 90 +++++++++++++++++++++-
 .../org/apache/iceberg/hive/HiveMetastoreTest.java | 19 ++++-
 .../apache/iceberg/hive/TestHiveCommitLocks.java   | 49 +++++++++++-
 4 files changed, 155 insertions(+), 7 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java b/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java
index 2ecb97fe12..7bfe9e1853 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java
@@ -35,4 +35,8 @@ public class CommitStateUnknownException extends RuntimeException {
   public CommitStateUnknownException(Throwable cause) {
     super(cause.getMessage() + "\n" + COMMON_INFO, cause);
   }
+
+  public CommitStateUnknownException(String message, Throwable cause) {
+    super(message + "\n" + cause.getMessage() + "\n" + COMMON_INFO, cause);
+  }
 }
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 294605a551..1acbfab920 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -31,6 +31,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
@@ -74,6 +77,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.JsonUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.thrift.TException;
@@ -90,6 +94,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
   private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
   private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
   private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
   private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES =
       "iceberg.hive.metadata-refresh-max-retries";
   private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
@@ -104,6 +110,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
   private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
   private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
   private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
   private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;
   private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
   private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION =
@@ -153,10 +160,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
   private final long lockAcquireTimeout;
   private final long lockCheckMinWaitTime;
   private final long lockCheckMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
   private final long maxHiveTablePropertySize;
   private final int metadataRefreshMaxRetries;
   private final FileIO fileIO;
   private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
 
   protected HiveTableOperations(
       Configuration conf,
@@ -177,6 +186,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
         conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
     this.lockCheckMaxWaitTime =
         conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
     this.metadataRefreshMaxRetries =
         conf.getInt(
             HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES,
@@ -185,6 +196,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
         conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
     long tableLevelLockCacheEvictionTimeout =
         conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-%d")
+                .build());
     initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
   }
 
@@ -243,9 +260,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     // JVM process, which would result in unnecessary and costly HMS lock acquisition requests
     ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
     tableLevelMutex.lock();
+    HiveLockHeartbeat hiveLockHeartbeat = null;
     try {
       lockId = Optional.of(acquireLock());
-      // TODO add lock heart beating for cases where default lock timeout is too low.
+      hiveLockHeartbeat =
+          new HiveLockHeartbeat(metaClients, lockId.get(), lockHeartbeatIntervalTime);
+      hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
 
       Table tbl = loadHmsTable();
 
@@ -296,7 +316,24 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
       }
 
       try {
+        if (hiveLockHeartbeat.future.isCancelled()
+            || hiveLockHeartbeat.encounteredException != null) {
+          throw new CommitFailedException(
+              "Failed to heartbeat for hive lock. %s",
+              hiveLockHeartbeat.encounteredException.getMessage());
+        }
+
         persistTable(tbl, updateHiveTable);
+        if (hiveLockHeartbeat.future.isCancelled()
+            || hiveLockHeartbeat.encounteredException != null) {
+          throw new CommitStateUnknownException(
+              "Failed to heartbeat for hive lock while "
+                  + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. "
+                  + "Please check the commit history. If you are running into this issue, try reducing "
+                  + "iceberg.hive.lock-heartbeat-interval-ms.",
+              hiveLockHeartbeat.encounteredException);
+        }
+
         commitStatus = CommitStatus.SUCCESS;
       } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
         throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName);
@@ -304,6 +341,9 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
       } catch (InvalidObjectException e) {
         throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName);
 
+      } catch (CommitFailedException | CommitStateUnknownException e) {
+        throw e;
+
       } catch (Throwable e) {
         if (e.getMessage() != null
             && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
@@ -338,6 +378,10 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
       throw new RuntimeException("Interrupted during commit", e);
 
     } finally {
+      if (hiveLockHeartbeat != null) {
+        hiveLockHeartbeat.cancel();
+      }
+
       cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex);
     }
 
@@ -363,7 +407,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     }
   }
 
-  private Table loadHmsTable() throws TException, InterruptedException {
+  @VisibleForTesting
+  Table loadHmsTable() throws TException, InterruptedException {
     try {
       return metaClients.run(client -> client.getTable(database, tableName));
     } catch (NoSuchObjectException nte) {
@@ -710,4 +755,45 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     return conf.getBoolean(
         ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
   }
+
+  private static class HiveLockHeartbeat implements Runnable {
+    private final ClientPool<IMetaStoreClient, TException> hmsClients;
+    private final long lockId;
+    private final long intervalMs;
+    private ScheduledFuture<?> future;
+    private volatile Exception encounteredException = null;
+
+    HiveLockHeartbeat(
+        ClientPool<IMetaStoreClient, TException> hmsClients, long lockId, long intervalMs) {
+      this.hmsClients = hmsClients;
+      this.lockId = lockId;
+      this.intervalMs = intervalMs;
+      this.future = null;
+    }
+
+    @Override
+    public void run() {
+      try {
+        hmsClients.run(
+            client -> {
+              client.heartbeat(0, lockId);
+              return null;
+            });
+      } catch (TException | InterruptedException e) {
+        this.encounteredException = e;
+        throw new CommitFailedException(e, "Failed to heartbeat for lock: %d", lockId);
+      }
+    }
+
+    public void schedule(ScheduledExecutorService scheduler) {
+      future =
+          scheduler.scheduleAtFixedRate(this, intervalMs / 2, intervalMs, TimeUnit.MILLISECONDS);
+    }
+
+    public void cancel() {
+      if (future != null) {
+        future.cancel(false);
+      }
+    }
+  }
 }
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java
index b3c31351c3..8c7adbc1f6 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.hive;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -41,10 +43,21 @@ public abstract class HiveMetastoreTest {
 
   @BeforeClass
   public static void startMetastore() throws Exception {
+    startMetastore(Collections.emptyMap());
+  }
+
+  public static void startMetastore(Map<String, String> hiveConfOverride) throws Exception {
     HiveMetastoreTest.metastore = new TestHiveMetastore();
-    metastore.start();
+    HiveConf hiveConfWithOverrides = new HiveConf(TestHiveMetastore.class);
+    if (hiveConfOverride != null) {
+      for (Map.Entry<String, String> kv : hiveConfOverride.entrySet()) {
+        hiveConfWithOverrides.set(kv.getKey(), kv.getValue());
+      }
+    }
+
+    metastore.start(hiveConfWithOverrides);
     HiveMetastoreTest.hiveConf = metastore.hiveConf();
-    HiveMetastoreTest.metastoreClient = new HiveMetaStoreClient(hiveConf);
+    HiveMetastoreTest.metastoreClient = new HiveMetaStoreClient(hiveConfWithOverrides);
     String dbPath = metastore.getDatabasePath(DB_NAME);
     Database db = new Database(DB_NAME, "description", dbPath, Maps.newHashMap());
     metastoreClient.createDatabase(db);
@@ -56,7 +69,7 @@ public abstract class HiveMetastoreTest {
                 ImmutableMap.of(
                     CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
                     String.valueOf(EVICTION_INTERVAL)),
-                hiveConf);
+                hiveConfWithOverrides);
   }
 
   @AfterClass
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 37fa9b519c..8b98eb897a 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -19,8 +19,11 @@
 package org.apache.iceberg.hive;
 
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -36,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
@@ -46,6 +50,7 @@ import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Types;
 import org.apache.thrift.TException;
 import org.junit.AfterClass;
@@ -53,12 +58,14 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.AdditionalAnswers;
+import org.mockito.invocation.InvocationOnMock;
 
 public class TestHiveCommitLocks extends HiveTableBaseTest {
   private static HiveTableOperations spyOps = null;
   private static HiveClientPool spyClientPool = null;
   private static CachedClientPool spyCachedClientPool = null;
-  private static Configuration overriddenHiveConf = new Configuration(hiveConf);
+  private static Configuration overriddenHiveConf;
   private static AtomicReference<IMetaStoreClient> spyClientRef = new AtomicReference<>();
   private static IMetaStoreClient spyClient = null;
   HiveTableOperations ops = null;
@@ -71,10 +78,16 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
   LockResponse notAcquiredLockResponse = new LockResponse(dummyLockId, LockState.NOT_ACQUIRED);
 
   @BeforeClass
-  public static void initializeSpies() throws Exception {
+  public static void startMetastore() throws Exception {
+    HiveMetastoreTest.startMetastore(
+        ImmutableMap.of(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname, "1s"));
+
+    // start spies
+    overriddenHiveConf = new Configuration(hiveConf);
     overriddenHiveConf.setLong("iceberg.hive.lock-timeout-ms", 6 * 1000);
     overriddenHiveConf.setLong("iceberg.hive.lock-check-min-wait-ms", 50);
     overriddenHiveConf.setLong("iceberg.hive.lock-check-max-wait-ms", 5 * 1000);
+    overriddenHiveConf.setLong("iceberg.hive.lock-heartbeat-interval-ms", 100);
 
     // Set up the spy clients as static variables instead of before every test.
     // The spy clients are reused between methods and closed at the end of all tests in this class.
@@ -139,6 +152,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
   public void testLockAcquisitionAtFirstTime() throws TException, InterruptedException {
     doReturn(acquiredLockResponse).when(spyClient).lock(any());
     doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+    doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
 
     spyOps.doCommit(metadataV2, metadataV1);
 
@@ -157,6 +171,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
         .when(spyClient)
         .checkLock(eq(dummyLockId));
     doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+    doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
 
     spyOps.doCommit(metadataV2, metadataV1);
 
@@ -269,4 +284,34 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
     // all threads eventually got their turn
     verify(spyClient, times(numConcurrentCommits)).lock(any(LockRequest.class));
   }
+
+  @Test
+  public void testLockHeartbeat() throws TException {
+    doReturn(acquiredLockResponse).when(spyClient).lock(any());
+    doAnswer(AdditionalAnswers.answersWithDelay(2000, InvocationOnMock::callRealMethod))
+        .when(spyClient)
+        .getTable(any(), any());
+    doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
+
+    spyOps.doCommit(metadataV2, metadataV1);
+
+    verify(spyClient, atLeastOnce()).heartbeat(eq(0L), eq(dummyLockId));
+  }
+
+  @Test
+  public void testLockHeartbeatFailureDuringCommit() throws TException, InterruptedException {
+    doReturn(acquiredLockResponse).when(spyClient).lock(any());
+    doAnswer(AdditionalAnswers.answersWithDelay(2000, InvocationOnMock::callRealMethod))
+        .when(spyOps)
+        .loadHmsTable();
+    doThrow(new TException("Failed to heart beat."))
+        .when(spyClient)
+        .heartbeat(eq(0L), eq(dummyLockId));
+
+    AssertHelpers.assertThrows(
+        "Expected commit failure due to failure in heartbeat.",
+        CommitFailedException.class,
+        "Failed to heartbeat for hive lock. Failed to heart beat.",
+        () -> spyOps.doCommit(metadataV2, metadataV1));
+  }
 }