You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/16 16:30:39 UTC

[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6570: Hive: Use EnvironmentContext instead of Hive Locks to provide transactional commits after HIVE-26882

deniskuzZ commented on code in PR #6570:
URL: https://github.com/apache/iceberg/pull/6570#discussion_r1071409985


##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -809,168 +603,38 @@ private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration c
   }
 
   /**
-   * Tries to create a lock. If the lock creation fails, and it is possible then retries the lock
-   * creation a few times. If the lock creation is successful then a {@link LockInfo} is returned,
-   * otherwise an appropriate exception is thrown.
+   * Returns if the hive locking should be enabled on the table, or not.
    *
-   * @param agentInfo The agentInfo which should be used during lock creation
-   * @return The created lock
-   * @throws UnknownHostException When we are not able to fill the hostname for lock creation
-   * @throws TException When there is an error during lock creation
-   */
-  @SuppressWarnings("ReverseDnsLookup")
-  private LockInfo tryLock(String agentInfo) throws UnknownHostException, TException {
-    LockInfo lockInfo = new LockInfo();
-
-    final LockComponent lockComponent =
-        new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
-    lockComponent.setTablename(tableName);
-    final LockRequest lockRequest =
-        new LockRequest(
-            Lists.newArrayList(lockComponent),
-            System.getProperty("user.name"),
-            InetAddress.getLocalHost().getHostName());
-
-    // Only works in Hive 2 or later.
-    if (HiveVersion.min(HiveVersion.HIVE_2)) {
-      lockRequest.setAgentInfo(agentInfo);
-    }
-
-    Tasks.foreach(lockRequest)
-        .retry(Integer.MAX_VALUE - 100)
-        .exponentialBackoff(
-            lockCreationMinWaitTime, lockCreationMaxWaitTime, lockCreationTimeout, 2.0)
-        .shouldRetryTest(e -> e instanceof TException && HiveVersion.min(HiveVersion.HIVE_2))
-        .throwFailureWhenFinished()
-        .run(
-            request -> {
-              try {
-                LockResponse lockResponse = metaClients.run(client -> client.lock(request));
-                lockInfo.lockId = lockResponse.getLockid();
-                lockInfo.lockState = lockResponse.getState();
-              } catch (TException te) {
-                LOG.warn("Failed to acquire lock {}", request, te);
-                try {
-                  // If we can not check for lock, or we do not find it, then rethrow the exception
-                  // Otherwise we are happy as the findLock sets the lockId and the state correctly
-                  if (!HiveVersion.min(HiveVersion.HIVE_2)) {
-                    LockInfo lockFound = findLock(agentInfo);
-                    if (lockFound != null) {
-                      lockInfo.lockId = lockFound.lockId;
-                      lockInfo.lockState = lockFound.lockState;
-                      LOG.info("Found lock {} by agentInfo {}", lockInfo, agentInfo);
-                      return;
-                    }
-                  }
-
-                  throw te;
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                  LOG.warn(
-                      "Interrupted while checking for lock on table {}.{}", database, tableName, e);
-                  throw new RuntimeException("Interrupted while checking for lock", e);
-                }
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOG.warn("Interrupted while acquiring lock on table {}.{}", database, tableName, e);
-                throw new RuntimeException("Interrupted while acquiring lock", e);
-              }
-            },
-            TException.class);
-
-    // This should be initialized always, or exception should be thrown.
-    LOG.debug("Lock {} created for table {}.{}", lockInfo, database, tableName);
-    return lockInfo;
-  }
-
-  /**
-   * Search for the locks using HMSClient.showLocks identified by the agentInfo. If the lock is
-   * there, then a {@link LockInfo} object is returned. If the lock is not found <code>null</code>
-   * is returned.
+   * <p>The decision is made like this:
+   *
+   * <ol>
+   *   <li>Table property value {@link TableProperties#LOCK_HIVE_ENABLED}
+   *   <li>If the table property is not set then check the hive-site.xml property value {@link
+   *       ConfigProperties#LOCK_HIVE_ENABLED}
+   *   <li>If none of the above is enabled then use the default value {@link
+   *       TableProperties#LOCK_HIVE_ENABLED_DEFAULT}
+   * </ol>
    *
-   * @param agentInfo The key for searching the locks
-   * @return The {@link LockInfo} for the found lock, or <code>null</code> if nothing found
+   * @param metadata Table metadata to use
+   * @param conf The hive configuration to use
+   * @return if the hive engine related values should be enabled or not
    */
-  private LockInfo findLock(String agentInfo) throws TException, InterruptedException {
-    Preconditions.checkArgument(
-        HiveVersion.min(HiveVersion.HIVE_2),
-        "Minimally Hive 2 HMS client is needed to find the Lock using the showLocks API call");
-    ShowLocksRequest showLocksRequest = new ShowLocksRequest();
-    showLocksRequest.setDbname(database);
-    showLocksRequest.setTablename(tableName);
-    ShowLocksResponse response = metaClients.run(client -> client.showLocks(showLocksRequest));
-    for (ShowLocksResponseElement lock : response.getLocks()) {
-      if (lock.getAgentInfo().equals(agentInfo)) {
-        // We found our lock
-        return new LockInfo(lock.getLockid(), lock.getState());
-      }
-    }
-
-    // Not found anything
-    return null;
-  }
-
-  private static class HiveLockHeartbeat implements Runnable {
-    private final ClientPool<IMetaStoreClient, TException> hmsClients;
-    private final long lockId;
-    private final long intervalMs;
-    private ScheduledFuture<?> future;
-    private volatile Exception encounteredException = null;
-
-    HiveLockHeartbeat(
-        ClientPool<IMetaStoreClient, TException> hmsClients, long lockId, long intervalMs) {
-      this.hmsClients = hmsClients;
-      this.lockId = lockId;
-      this.intervalMs = intervalMs;
-      this.future = null;
-    }
-
-    @Override
-    public void run() {
-      try {
-        hmsClients.run(
-            client -> {
-              client.heartbeat(0, lockId);
-              return null;
-            });
-      } catch (TException | InterruptedException e) {
-        this.encounteredException = e;
-        throw new CommitFailedException(e, "Failed to heartbeat for lock: %d", lockId);
-      }
-    }
-
-    public void schedule(ScheduledExecutorService scheduler) {
-      future =
-          scheduler.scheduleAtFixedRate(this, intervalMs / 2, intervalMs, TimeUnit.MILLISECONDS);
+  private static boolean hiveLockEnabled(TableMetadata metadata, Configuration conf) {
+    if (metadata.properties().get(TableProperties.LOCK_HIVE_ENABLED) != null) {
+      // We know that the property is set, so default value will not be used,
+      return metadata.propertyAsBoolean(TableProperties.LOCK_HIVE_ENABLED, false);
     }
 
-    public void cancel() {
-      if (future != null) {
-        future.cancel(false);
-      }
-    }
+    return conf.getBoolean(
+        ConfigProperties.LOCK_HIVE_ENABLED, TableProperties.LOCK_HIVE_ENABLED_DEFAULT);
   }
 
-  private static class LockInfo {
-    private long lockId;
-    private LockState lockState;
-
-    private LockInfo() {
-      this.lockId = -1;
-      this.lockState = null;
-    }
-
-    private LockInfo(long lockId, LockState lockState) {
-      this.lockId = lockId;
-      this.lockState = lockState;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("lockId", lockId)
-          .add("lockState", lockState)
-          .toString();
+  @VisibleForTesting
+  HiveLock lockObject(TableMetadata metadata) {
+    if (hiveLockEnabled(metadata, conf)) {
+      return new MetastoreLock(conf, metaClients, catalogName, database, tableName);

Review Comment:
   prev locking strategy migrated to MetastoreLock, right?



##########
docs/configuration.md:
##########
@@ -175,8 +175,13 @@ The HMS table locking is a 2-step process:
 | iceberg.hive.lock-heartbeat-interval-ms   | 240000 (4 min)  | The heartbeat interval for the HMS locks.                                    |
 | iceberg.hive.metadata-refresh-max-retries | 2               | Maximum number of retries when the metadata file is missing                  |
 | iceberg.hive.table-level-lock-evict-ms    | 600000 (10 min) | The timeout for the JVM table lock is                                        |
+| iceberg.lock.hive.enabled                 | true            | If enabled HMS locks will be used to ensure of the atomicity of the commits  | 
 
 Note: `iceberg.hive.lock-check-max-wait-ms` and `iceberg.hive.lock-heartbeat-interval-ms` should be less than the [transaction timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout) 
 of the Hive Metastore (`hive.txn.timeout` or `metastore.txn.timeout` in the newer versions). Otherwise, the heartbeats on the lock (which happens during the lock checks) would end up expiring in the 
 Hive Metastore before the lock is retried from Iceberg.
 
+Note: `iceberg.lock.hive.enabled` should only be set to `false` if [HIVE-26882](https://issues.apache.org/jira/browse/HIVE-26882)

Review Comment:
   Could it cause some issues in the case of HA and not consistent HMS configs across the instances? 



##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -303,6 +303,9 @@ private TableProperties() {}
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String LOCK_HIVE_ENABLED = "lock.hive.enabled";

Review Comment:
   minor, why not `hive.lock.enabled`?



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -809,168 +603,38 @@ private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration c
   }
 
   /**
-   * Tries to create a lock. If the lock creation fails, and it is possible then retries the lock
-   * creation a few times. If the lock creation is successful then a {@link LockInfo} is returned,
-   * otherwise an appropriate exception is thrown.
+   * Returns if the hive locking should be enabled on the table, or not.
    *
-   * @param agentInfo The agentInfo which should be used during lock creation
-   * @return The created lock
-   * @throws UnknownHostException When we are not able to fill the hostname for lock creation
-   * @throws TException When there is an error during lock creation
-   */
-  @SuppressWarnings("ReverseDnsLookup")
-  private LockInfo tryLock(String agentInfo) throws UnknownHostException, TException {
-    LockInfo lockInfo = new LockInfo();
-
-    final LockComponent lockComponent =
-        new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
-    lockComponent.setTablename(tableName);
-    final LockRequest lockRequest =
-        new LockRequest(
-            Lists.newArrayList(lockComponent),
-            System.getProperty("user.name"),
-            InetAddress.getLocalHost().getHostName());
-
-    // Only works in Hive 2 or later.
-    if (HiveVersion.min(HiveVersion.HIVE_2)) {
-      lockRequest.setAgentInfo(agentInfo);
-    }
-
-    Tasks.foreach(lockRequest)
-        .retry(Integer.MAX_VALUE - 100)
-        .exponentialBackoff(
-            lockCreationMinWaitTime, lockCreationMaxWaitTime, lockCreationTimeout, 2.0)
-        .shouldRetryTest(e -> e instanceof TException && HiveVersion.min(HiveVersion.HIVE_2))
-        .throwFailureWhenFinished()
-        .run(
-            request -> {
-              try {
-                LockResponse lockResponse = metaClients.run(client -> client.lock(request));
-                lockInfo.lockId = lockResponse.getLockid();
-                lockInfo.lockState = lockResponse.getState();
-              } catch (TException te) {
-                LOG.warn("Failed to acquire lock {}", request, te);
-                try {
-                  // If we can not check for lock, or we do not find it, then rethrow the exception
-                  // Otherwise we are happy as the findLock sets the lockId and the state correctly
-                  if (!HiveVersion.min(HiveVersion.HIVE_2)) {
-                    LockInfo lockFound = findLock(agentInfo);
-                    if (lockFound != null) {
-                      lockInfo.lockId = lockFound.lockId;
-                      lockInfo.lockState = lockFound.lockState;
-                      LOG.info("Found lock {} by agentInfo {}", lockInfo, agentInfo);
-                      return;
-                    }
-                  }
-
-                  throw te;
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                  LOG.warn(
-                      "Interrupted while checking for lock on table {}.{}", database, tableName, e);
-                  throw new RuntimeException("Interrupted while checking for lock", e);
-                }
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOG.warn("Interrupted while acquiring lock on table {}.{}", database, tableName, e);
-                throw new RuntimeException("Interrupted while acquiring lock", e);
-              }
-            },
-            TException.class);
-
-    // This should be initialized always, or exception should be thrown.
-    LOG.debug("Lock {} created for table {}.{}", lockInfo, database, tableName);
-    return lockInfo;
-  }
-
-  /**
-   * Search for the locks using HMSClient.showLocks identified by the agentInfo. If the lock is
-   * there, then a {@link LockInfo} object is returned. If the lock is not found <code>null</code>
-   * is returned.
+   * <p>The decision is made like this:
+   *
+   * <ol>
+   *   <li>Table property value {@link TableProperties#LOCK_HIVE_ENABLED}
+   *   <li>If the table property is not set then check the hive-site.xml property value {@link
+   *       ConfigProperties#LOCK_HIVE_ENABLED}
+   *   <li>If none of the above is enabled then use the default value {@link
+   *       TableProperties#LOCK_HIVE_ENABLED_DEFAULT}
+   * </ol>
    *
-   * @param agentInfo The key for searching the locks
-   * @return The {@link LockInfo} for the found lock, or <code>null</code> if nothing found
+   * @param metadata Table metadata to use
+   * @param conf The hive configuration to use
+   * @return if the hive engine related values should be enabled or not
    */
-  private LockInfo findLock(String agentInfo) throws TException, InterruptedException {
-    Preconditions.checkArgument(
-        HiveVersion.min(HiveVersion.HIVE_2),
-        "Minimally Hive 2 HMS client is needed to find the Lock using the showLocks API call");
-    ShowLocksRequest showLocksRequest = new ShowLocksRequest();
-    showLocksRequest.setDbname(database);
-    showLocksRequest.setTablename(tableName);
-    ShowLocksResponse response = metaClients.run(client -> client.showLocks(showLocksRequest));
-    for (ShowLocksResponseElement lock : response.getLocks()) {
-      if (lock.getAgentInfo().equals(agentInfo)) {
-        // We found our lock
-        return new LockInfo(lock.getLockid(), lock.getState());
-      }
-    }
-
-    // Not found anything
-    return null;
-  }
-
-  private static class HiveLockHeartbeat implements Runnable {
-    private final ClientPool<IMetaStoreClient, TException> hmsClients;
-    private final long lockId;
-    private final long intervalMs;
-    private ScheduledFuture<?> future;
-    private volatile Exception encounteredException = null;
-
-    HiveLockHeartbeat(
-        ClientPool<IMetaStoreClient, TException> hmsClients, long lockId, long intervalMs) {
-      this.hmsClients = hmsClients;
-      this.lockId = lockId;
-      this.intervalMs = intervalMs;
-      this.future = null;
-    }
-
-    @Override
-    public void run() {
-      try {
-        hmsClients.run(
-            client -> {
-              client.heartbeat(0, lockId);
-              return null;
-            });
-      } catch (TException | InterruptedException e) {
-        this.encounteredException = e;
-        throw new CommitFailedException(e, "Failed to heartbeat for lock: %d", lockId);
-      }
-    }
-
-    public void schedule(ScheduledExecutorService scheduler) {
-      future =
-          scheduler.scheduleAtFixedRate(this, intervalMs / 2, intervalMs, TimeUnit.MILLISECONDS);
+  private static boolean hiveLockEnabled(TableMetadata metadata, Configuration conf) {
+    if (metadata.properties().get(TableProperties.LOCK_HIVE_ENABLED) != null) {
+      // We know that the property is set, so default value will not be used,
+      return metadata.propertyAsBoolean(TableProperties.LOCK_HIVE_ENABLED, false);
     }
 
-    public void cancel() {
-      if (future != null) {
-        future.cancel(false);
-      }
-    }
+    return conf.getBoolean(
+        ConfigProperties.LOCK_HIVE_ENABLED, TableProperties.LOCK_HIVE_ENABLED_DEFAULT);
   }
 
-  private static class LockInfo {
-    private long lockId;
-    private LockState lockState;
-
-    private LockInfo() {
-      this.lockId = -1;
-      this.lockState = null;
-    }
-
-    private LockInfo(long lockId, LockState lockState) {
-      this.lockId = lockId;
-      this.lockState = lockState;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("lockId", lockId)
-          .add("lockState", lockState)
-          .toString();
+  @VisibleForTesting
+  HiveLock lockObject(TableMetadata metadata) {

Review Comment:
   Could we delegate to LockManager to decide what locks need to be created? I think it would be more convenient to work in the future. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org