You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "amogh-jahagirdar (via GitHub)" <gi...@apache.org> on 2023/01/23 15:08:42 UTC

[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6648: Hive: Refactor commit lock mechanism from HiveTableOperations

amogh-jahagirdar commented on code in PR #6648:
URL: https://github.com/apache/iceberg/pull/6648#discussion_r1084169823


##########
hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetastoreLock implements HiveLock {
+  private static final Logger LOG = LoggerFactory.getLogger(MetastoreLock.class);
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
+      "iceberg.hive.lock-creation-timeout-ms";
+  private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
+      "iceberg.hive.lock-creation-min-wait-ms";
+  private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
+      "iceberg.hive.lock-creation-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final String databaseName;
+  private final String tableName;
+  private final String fullName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockCreationTimeout;
+  private final long lockCreationMinWaitTime;
+  private final long lockCreationMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+
+  private final String agentInfo;
+
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private Optional<ReentrantLock> jvmLock = Optional.empty();
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public MetastoreLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockCreationTimeout =
+        conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS, HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
+    this.lockCreationMinWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS, HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
+    this.lockCreationMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS, HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+
+    this.agentInfo = "Iceberg-" + UUID.randomUUID();
+
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-" + fullName + "-%d")
+                .build());
+
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  @Override
+  public void lock() throws LockException {
+    // getting a process-level lock per table to avoid concurrent commit attempts to the same table
+    // from the same
+    // JVM process, which would result in unnecessary and costly HMS lock acquisition requests
+    acquireJvmLock();
+
+    // Getting HMS lock
+    hmsLockId = Optional.of(acquireLock());
+
+    // Starting heartbeat for the HMS lock
+    hiveLockHeartbeat =
+        new HiveLockHeartbeat(metaClients, hmsLockId.get(), lockHeartbeatIntervalTime);
+    hiveLockHeartbeat.schedule(exitingScheduledExecutorService);

Review Comment:
   Here I wonder again if we can leverage the existing LockManager (maybe extend it to allow passing in a custom heartbeat runnable)



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetastoreLock implements HiveLock {
+  private static final Logger LOG = LoggerFactory.getLogger(MetastoreLock.class);
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
+      "iceberg.hive.lock-creation-timeout-ms";
+  private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
+      "iceberg.hive.lock-creation-min-wait-ms";
+  private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
+      "iceberg.hive.lock-creation-max-wait-ms";
+  private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
+      "iceberg.hive.lock-heartbeat-interval-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
+      "iceberg.hive.table-level-lock-evict-ms";
+
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+  private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+  private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+  private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 * 1000; // 4 minutes
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+  private final String databaseName;
+  private final String tableName;
+  private final String fullName;
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+  private final long lockCreationTimeout;
+  private final long lockCreationMinWaitTime;
+  private final long lockCreationMaxWaitTime;
+  private final long lockHeartbeatIntervalTime;
+  private final ScheduledExecutorService exitingScheduledExecutorService;
+
+  private final String agentInfo;
+
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private Optional<ReentrantLock> jvmLock = Optional.empty();
+  private HiveLockHeartbeat hiveLockHeartbeat = null;
+
+  public MetastoreLock(
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      String catalogName,
+      String databaseName,
+      String tableName) {
+    this.metaClients = metaClients;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    this.lockCreationTimeout =
+        conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS, HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
+    this.lockCreationMinWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS, HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
+    this.lockCreationMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS, HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
+    this.lockHeartbeatIntervalTime =
+        conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS, HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+
+    this.agentInfo = "Iceberg-" + UUID.randomUUID();
+
+    this.exitingScheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("iceberg-hive-lock-heartbeat-" + fullName + "-%d")
+                .build());
+
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  @Override
+  public void lock() throws LockException {
+    // getting a process-level lock per table to avoid concurrent commit attempts to the same table
+    // from the same
+    // JVM process, which would result in unnecessary and costly HMS lock acquisition requests
+    acquireJvmLock();

Review Comment:
   Maintaining a in process lock and acquiring it before using the distributed locking mechanism is something we probably want in all cases (I think) if we use a lock manager abstraction I think we can move it to the parent and then after that delegate to acquireLock which acquires the distributed lock



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveLock.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+public interface HiveLock {
+  void lock() throws LockException;
+
+  void ensureActive() throws LockException;
+
+  void unlock();
+}

Review Comment:
   There's already a LockManager interface with acquire/release and a BaseLockManager which has heartbeating mechanisms would it make sense to leverage the existing abstractions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org