You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/06 00:12:46 UTC

[GitHub] [iceberg] jackye1995 opened a new pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

jackye1995 opened a new pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034


   As discussed, the DynamoDB part of #1823 
   Also updates the documentation for `LockManager.release()`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#discussion_r552296863



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java
##########
@@ -36,9 +36,12 @@
 
   /**
    * Release a lock
+   *
+   * @apiNote exception must not be thrown for this method.
+   *
    * @param entityId ID of the entity to lock
    * @param ownerId ID of the owner if the lock
-   * @return if the lock for the entity of the owner is released
+   * @return if the owner held the key and successfully released it.

Review comment:
       Key? I think you might still mean "lock"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#discussion_r552316891



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;
+
+/**
+ * DynamoDB implementation for the lock manager.
+ */
+class DynamoLockManager extends LockManagers.BaseLockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class);
+
+  private static final String COL_LOCK_ENTITY_ID = "entityId";
+  private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
+  private static final String COL_VERSION = "version";
+  private static final String COL_LOCK_OWNER_ID = "ownerId";
+
+  private static final String CONDITION_LOCK_ID_MATCH = String.format(
+      "%s = :eid AND %s = :oid",
+      COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format(
+      "attribute_not_exists(%s)",
+      COL_LOCK_ENTITY_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format(
+      "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)",
+      COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
+
+  private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+
+  private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList(
+      KeySchemaElement.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .keyType(KeyType.HASH)
+          .build());
+
+  private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList(
+      AttributeDefinition.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .attributeType(ScalarAttributeType.S)
+          .build());
+
+  private final Map<String, ScheduledFuture<?>> heartbeats = Maps.newHashMap();
+
+  private DynamoDbClient dynamo;
+  private String lockTableName;
+
+  /**
+   * constructor for dynamic initialization, {@link #initialize(Map)} must be called later.
+   */
+  DynamoLockManager() {
+  }
+
+  /**
+   * constructor used for testing purpose
+   * @param dynamo dynamo client
+   * @param lockTableName lock table name
+   */
+  DynamoLockManager(DynamoDbClient dynamo, String lockTableName) {
+    super.initialize(Maps.newHashMap());
+    this.dynamo = dynamo;
+    this.lockTableName = lockTableName;
+    ensureLockTableExistsOrCreate();
+  }
+
+  private void ensureLockTableExistsOrCreate() {
+
+    if (tableExists(lockTableName)) {
+      return;
+    }
+
+    LOG.info("Dynamo lock table {} not found, trying to create", lockTableName);
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(lockTableName)
+        .keySchema(lockTableSchema())
+        .attributeDefinitions(lockTableColDefinitions())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(lockTableName)
+        .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  @VisibleForTesting
+  boolean tableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
+    }
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    super.initialize(properties);
+    this.dynamo = AwsClientFactories.from(properties).dynamo();
+    this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE);
+    Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null");
+    ensureLockTableExistsOrCreate();
+  }
+
+  @Override
+  public boolean acquire(String entityId, String ownerId) {
+    try {
+      Tasks.foreach(entityId)
+          .throwFailureWhenFinished()
+          .retry(Integer.MAX_VALUE - 1)
+          .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1)
+          .onlyRetryOn(
+              ConditionalCheckFailedException.class,
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> acquireOnce(id, ownerId));
+      return true;
+    } catch (DynamoDbException e) {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  void acquireOnce(String entityId, String ownerId) {
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(lockTableName)
+        .key(toKey(entityId))
+        .build());
+
+    if (!response.hasItem()) {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST)
+          .build());
+    } else {
+      Map<String, AttributeValue> currentItem = response.item();
+
+      try {
+        Thread.sleep(Long.parseLong(currentItem.get(COL_LEASE_DURATION_MS).n()));
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e);
+      }
+
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH)
+          .expressionAttributeValues(ImmutableMap.of(
+              ":eid", AttributeValue.builder().s(entityId).build(),
+              ":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build()))
+          .build());
+    }
+
+    startNewHeartbeat(entityId, ownerId);
+  }
+
+  private void startNewHeartbeat(String entityId, String ownerId) {
+    if (heartbeats.containsKey(entityId)) {
+      heartbeats.remove(entityId).cancel(false);
+    }
+
+    heartbeats.put(entityId, scheduler().scheduleAtFixedRate(() -> dynamo.putItem(PutItemRequest.builder()
+        .tableName(lockTableName)
+        .item(toNewItem(entityId, ownerId))
+        .conditionExpression(CONDITION_LOCK_ID_MATCH)
+        .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+        .build()), 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));

Review comment:
       I think this task needs to handle exceptions and keep running.
   
   If there is a connection issue that causes this to fail, then the task will not be rescheduled. Instead, the future will contain the error. Then other concurrent processes could hold the lock even though this process thinks it holds the lock.
   
   At a minimum, I think that this thread should log any exception thrown by dynamo and keep trying to update its lease.
   
   It would be great if we could also detect if the lock is no longer held (any put fails with `ConditionalCheckFailedException`) and stop any commit attempts, but I don't see a way to do that. This should definitely log an error if `ConditionalCheckFailedException` is thrown and note that two processes may think they both hold the lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#discussion_r553039382



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;
+
+/**
+ * DynamoDB implementation for the lock manager.
+ */
+class DynamoLockManager extends LockManagers.BaseLockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class);
+
+  private static final String COL_LOCK_ENTITY_ID = "entityId";
+  private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
+  private static final String COL_VERSION = "version";
+  private static final String COL_LOCK_OWNER_ID = "ownerId";
+
+  private static final String CONDITION_LOCK_ID_MATCH = String.format(
+      "%s = :eid AND %s = :oid",
+      COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format(
+      "attribute_not_exists(%s)",
+      COL_LOCK_ENTITY_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format(
+      "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)",
+      COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
+
+  private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  private static final int RELEASE_RETRY_ATTEMPTS_MAX = 5;
+
+  private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList(
+      KeySchemaElement.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .keyType(KeyType.HASH)
+          .build());
+
+  private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList(
+      AttributeDefinition.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .attributeType(ScalarAttributeType.S)
+          .build());
+
+  private final Map<String, DynamoHeartbeat> heartbeats = Maps.newHashMap();
+
+  private DynamoDbClient dynamo;
+  private String lockTableName;
+
+  /**
+   * constructor for dynamic initialization, {@link #initialize(Map)} must be called later.
+   */
+  DynamoLockManager() {
+  }
+
+  /**
+   * constructor used for testing purpose
+   * @param dynamo dynamo client
+   * @param lockTableName lock table name
+   */
+  DynamoLockManager(DynamoDbClient dynamo, String lockTableName) {
+    super.initialize(Maps.newHashMap());
+    this.dynamo = dynamo;
+    this.lockTableName = lockTableName;
+    ensureLockTableExistsOrCreate();
+  }
+
+  private void ensureLockTableExistsOrCreate() {
+
+    if (tableExists(lockTableName)) {
+      return;
+    }
+
+    LOG.info("Dynamo lock table {} not found, trying to create", lockTableName);
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(lockTableName)
+        .keySchema(lockTableSchema())
+        .attributeDefinitions(lockTableColDefinitions())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(lockTableName)
+        .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  @VisibleForTesting
+  boolean tableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
+    }
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    super.initialize(properties);
+    this.dynamo = AwsClientFactories.from(properties).dynamo();
+    this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE);
+    Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null");
+    ensureLockTableExistsOrCreate();
+  }
+
+  @Override
+  public boolean acquire(String entityId, String ownerId) {
+    try {
+      Tasks.foreach(entityId)
+          .throwFailureWhenFinished()
+          .retry(Integer.MAX_VALUE - 1)
+          .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1)
+          .onlyRetryOn(
+              ConditionalCheckFailedException.class,
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> acquireOnce(id, ownerId));
+      return true;
+    } catch (DynamoDbException e) {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  void acquireOnce(String entityId, String ownerId) {
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(lockTableName)
+        .key(toKey(entityId))
+        .build());
+
+    if (!response.hasItem()) {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId, heartbeatTimeoutMs()))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST)
+          .build());
+    } else {
+      Map<String, AttributeValue> currentItem = response.item();
+
+      try {
+        Thread.sleep(Long.parseLong(currentItem.get(COL_LEASE_DURATION_MS).n()));
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e);
+      }
+
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId, heartbeatTimeoutMs()))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH)
+          .expressionAttributeValues(ImmutableMap.of(
+              ":eid", AttributeValue.builder().s(entityId).build(),
+              ":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build()))
+          .build());
+    }
+
+    startNewHeartbeat(entityId, ownerId);
+  }
+
+  private void startNewHeartbeat(String entityId, String ownerId) {
+    if (heartbeats.containsKey(entityId)) {
+      heartbeats.remove(entityId).cancel();
+    }
+
+    DynamoHeartbeat heartbeat = new DynamoHeartbeat(dynamo, lockTableName,
+        heartbeatIntervalMs(), heartbeatTimeoutMs(), entityId, ownerId);
+    heartbeat.schedule(scheduler());
+    heartbeats.put(entityId, heartbeat);
+  }
+
+  @Override
+  public boolean release(String entityId, String ownerId) {
+    boolean succeeded = false;
+    DynamoHeartbeat heartbeat = heartbeats.get(entityId);
+    try {
+      Tasks.foreach(entityId)
+          .retry(RELEASE_RETRY_ATTEMPTS_MAX)
+          .throwFailureWhenFinished()
+          .onlyRetryOn(
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> dynamo.deleteItem(DeleteItemRequest.builder()
+              .tableName(lockTableName)
+              .key(toKey(id))
+              .conditionExpression(CONDITION_LOCK_ID_MATCH)
+              .expressionAttributeValues(toLockIdValues(id, ownerId))
+              .build()));
+      succeeded = true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Failed to release lock for entity: {}, owner: {}, lock entity does not exist or owner not match",
+          entityId, ownerId, e);
+    } catch (DynamoDbException e) {
+      LOG.error("Failed to release lock {} by for entity: {}, owner: {}, encountered unexpected DynamoDB exception",
+          entityId, ownerId, e);
+    } finally {
+      if (heartbeat != null && heartbeat.ownerId().equals(ownerId)) {
+        heartbeat.cancel();
+      }
+    }
+
+    return succeeded;
+  }
+
+  private static Map<String, AttributeValue> toKey(String entityId) {
+    return ImmutableMap.of(COL_LOCK_ENTITY_ID, AttributeValue.builder().s(entityId).build());
+  }
+
+  private static Map<String, AttributeValue> toNewItem(String entityId, String ownerId, long heartbeatTimeoutMs) {
+    return ImmutableMap.of(
+        COL_LOCK_ENTITY_ID, AttributeValue.builder().s(entityId).build(),
+        COL_LOCK_OWNER_ID, AttributeValue.builder().s(ownerId).build(),
+        COL_VERSION, AttributeValue.builder().s(UUID.randomUUID().toString()).build(),
+        COL_LEASE_DURATION_MS, AttributeValue.builder().n(Long.toString(heartbeatTimeoutMs)).build());
+  }
+
+  private static Map<String, AttributeValue> toLockIdValues(String entityId, String ownerId) {
+    return ImmutableMap.of(
+        ":eid", AttributeValue.builder().s(entityId).build(),
+        ":oid", AttributeValue.builder().s(ownerId).build());
+  }
+
+  @Override
+  public void close() {
+    dynamo.close();
+    heartbeats.values().forEach(DynamoHeartbeat::cancel);
+    heartbeats.clear();
+  }
+
+  /**
+   * The lock table schema, for users who would like to create the table separately
+   * @return lock table schema
+   */
+  public static List<KeySchemaElement> lockTableSchema() {
+    return LOCK_TABLE_SCHEMA;
+  }
+
+  /**
+   * The lock table column definition, for users who whould like to create the table separately
+   * @return lock table column definition
+   */
+  public static List<AttributeDefinition> lockTableColDefinitions() {
+    return LOCK_TABLE_COL_DEFINITIONS;
+  }
+
+  private static class DynamoHeartbeat implements Runnable {
+
+    private final DynamoDbClient dynamo;
+    private final String lockTableName;
+    private final long intervalMs;
+    private final long timeoutMs;
+    private final String entityId;
+    private final String ownerId;
+    private ScheduledFuture<?> future;
+
+    DynamoHeartbeat(DynamoDbClient dynamo, String lockTableName, long intervalMs, long timeoutMs,
+                           String entityId, String ownerId) {
+      this.dynamo = dynamo;
+      this.lockTableName = lockTableName;
+      this.intervalMs = intervalMs;
+      this.timeoutMs = timeoutMs;
+      this.entityId = entityId;
+      this.ownerId = ownerId;
+      this.future = null;
+    }
+
+    @Override
+    public void run() {
+      try {
+        dynamo.putItem(PutItemRequest.builder()
+            .tableName(lockTableName)
+            .item(toNewItem(entityId, ownerId, timeoutMs))
+            .conditionExpression(CONDITION_LOCK_ID_MATCH)
+            .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+            .build());
+      } catch (ConditionalCheckFailedException e) {
+        LOG.error("Fail to heartbeat for entity: {}, owner: {} due to conditional check failure, " +
+                "unsafe concurrent commits might be going on", entityId, ownerId, e);
+      } catch (DynamoDbException e) {

Review comment:
       This is one of the few times I think that we actually need to broaden the catch clause to `Exception` or `RuntimeException` instead of `DynamoDbException`. If anything happens that causes the heartbeat to fail, like an unrelated `NullPointerException` inside the AWS SDK, then we want this to keep trying as long as the process thinks it holds the lock. Catching `RuntimeException` ensures that no exceptions should stop this from getting scheduled again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#discussion_r552315105



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;
+
+/**
+ * DynamoDB implementation for the lock manager.
+ */
+class DynamoLockManager extends LockManagers.BaseLockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class);
+
+  private static final String COL_LOCK_ENTITY_ID = "entityId";
+  private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
+  private static final String COL_VERSION = "version";
+  private static final String COL_LOCK_OWNER_ID = "ownerId";
+
+  private static final String CONDITION_LOCK_ID_MATCH = String.format(
+      "%s = :eid AND %s = :oid",
+      COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format(
+      "attribute_not_exists(%s)",
+      COL_LOCK_ENTITY_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format(
+      "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)",
+      COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
+
+  private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+
+  private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList(
+      KeySchemaElement.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .keyType(KeyType.HASH)
+          .build()
+  );
+
+  private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList(
+      AttributeDefinition.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .attributeType(ScalarAttributeType.S)
+          .build()
+  );
+
+  private final Map<String, ScheduledFuture<?>> heartbeats = Maps.newHashMap();
+
+  private DynamoDbClient dynamo;
+  private String lockTableName;
+
+  /**
+   * constructor for dynamic initialization, {@link #initialize(Map)} must be called later.
+   */
+  DynamoLockManager() {
+  }
+
+  /**
+   * constructor used for testing purpose
+   * @param dynamo dynamo client
+   * @param lockTableName lock table name
+   */
+  DynamoLockManager(DynamoDbClient dynamo, String lockTableName) {
+    super.initialize(Maps.newHashMap());
+    this.dynamo = dynamo;
+    this.lockTableName = lockTableName;
+    ensureLockTableExistsOrCreate();
+  }
+
+  private void ensureLockTableExistsOrCreate() {
+
+    if (tableExists(lockTableName)) {
+      return;
+    }
+
+    LOG.info("Dynamo lock table {} not found, trying to create", lockTableName);
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(lockTableName)
+        .keySchema(lockTableSchema())
+        .attributeDefinitions(lockTableColDefinitions())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(lockTableName)
+        .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  @VisibleForTesting
+  boolean tableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
+    }
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    super.initialize(properties);
+    this.dynamo = AwsClientFactories.from(properties).dynamo();
+    this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE);
+    Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null");
+    ensureLockTableExistsOrCreate();
+  }
+
+  @Override
+  public boolean acquire(String entityId, String ownerId) {
+    try {
+      Tasks.foreach(entityId)
+          .throwFailureWhenFinished()
+          .retry(Integer.MAX_VALUE - 1)
+          .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1)
+          .onlyRetryOn(
+              ConditionalCheckFailedException.class,
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> acquireOnce(id, ownerId));
+      return true;
+    } catch (RuntimeException e) {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  void acquireOnce(String entityId, String ownerId) {
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(lockTableName)
+        .key(toKey(entityId))
+        .build());
+
+    if (!response.hasItem()) {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST)
+          .build());
+    } else {
+      Map<String, AttributeValue> currentItem = response.item();
+
+      try {
+        Thread.sleep(Long.parseLong(currentItem.get(COL_LEASE_DURATION_MS).n()));
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e);
+      }
+
+      Map<String, AttributeValue> expressionValues = Maps.newHashMap();
+      expressionValues.put(":eid", AttributeValue.builder().s(entityId).build());
+      expressionValues.put(":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build());
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH)
+          .expressionAttributeValues(expressionValues)
+          .build());
+    }
+
+    startNewHeartbeat(entityId, ownerId);
+  }
+
+  private void startNewHeartbeat(String entityId, String ownerId) {
+    if (heartbeats.containsKey(entityId)) {
+      heartbeats.remove(entityId).cancel(false);
+    }
+
+    heartbeats.put(entityId, scheduler().scheduleAtFixedRate(() -> dynamo.putItem(PutItemRequest.builder()
+        .tableName(lockTableName)
+        .item(toNewItem(entityId, ownerId))
+        .conditionExpression(CONDITION_LOCK_ID_MATCH)
+        .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+        .build()), 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
+  }
+
+  @Override
+  public boolean release(String entityId, String ownerId) {
+    ScheduledFuture<?> heartbeat = heartbeats.get(entityId);
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(lockTableName)
+          .key(toKey(entityId))
+          .conditionExpression(CONDITION_LOCK_ID_MATCH)
+          .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+          .build());

Review comment:
       I think this method is a good candidate to use `ExceptionUtil.runSafely`.
   
   Cancelling the heartbeat should be done in a `finally` block in case there is a connection issue that causes this to throw an exception. Otherwise, the heartbeat thread could accidentally hold the lock forever if it succeeds.
   
   The reason I would say to use `runSafely` is that I think it would be better to automatically suppress any exceptions in the `finally` block and that's what `runSafely` does. I doubt that `heartbeat.cancel(false)` would throw any exceptions, but it's good to be careful in case the cancellation code changes in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#discussion_r553050768



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;
+
+/**
+ * DynamoDB implementation for the lock manager.
+ */
+class DynamoLockManager extends LockManagers.BaseLockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class);
+
+  private static final String COL_LOCK_ENTITY_ID = "entityId";
+  private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
+  private static final String COL_VERSION = "version";
+  private static final String COL_LOCK_OWNER_ID = "ownerId";
+
+  private static final String CONDITION_LOCK_ID_MATCH = String.format(
+      "%s = :eid AND %s = :oid",
+      COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format(
+      "attribute_not_exists(%s)",
+      COL_LOCK_ENTITY_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format(
+      "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)",
+      COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
+
+  private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+
+  private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList(
+      KeySchemaElement.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .keyType(KeyType.HASH)
+          .build()
+  );
+
+  private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList(
+      AttributeDefinition.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .attributeType(ScalarAttributeType.S)
+          .build()
+  );
+
+  private final Map<String, ScheduledFuture<?>> heartbeats = Maps.newHashMap();
+
+  private DynamoDbClient dynamo;
+  private String lockTableName;
+
+  /**
+   * constructor for dynamic initialization, {@link #initialize(Map)} must be called later.
+   */
+  DynamoLockManager() {
+  }
+
+  /**
+   * constructor used for testing purpose
+   * @param dynamo dynamo client
+   * @param lockTableName lock table name
+   */
+  DynamoLockManager(DynamoDbClient dynamo, String lockTableName) {
+    super.initialize(Maps.newHashMap());
+    this.dynamo = dynamo;
+    this.lockTableName = lockTableName;
+    ensureLockTableExistsOrCreate();
+  }
+
+  private void ensureLockTableExistsOrCreate() {
+
+    if (tableExists(lockTableName)) {
+      return;
+    }
+
+    LOG.info("Dynamo lock table {} not found, trying to create", lockTableName);
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(lockTableName)
+        .keySchema(lockTableSchema())
+        .attributeDefinitions(lockTableColDefinitions())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(lockTableName)
+        .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  @VisibleForTesting
+  boolean tableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
+    }
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    super.initialize(properties);
+    this.dynamo = AwsClientFactories.from(properties).dynamo();
+    this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE);
+    Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null");
+    ensureLockTableExistsOrCreate();
+  }
+
+  @Override
+  public boolean acquire(String entityId, String ownerId) {
+    try {
+      Tasks.foreach(entityId)
+          .throwFailureWhenFinished()
+          .retry(Integer.MAX_VALUE - 1)
+          .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1)
+          .onlyRetryOn(
+              ConditionalCheckFailedException.class,
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> acquireOnce(id, ownerId));
+      return true;
+    } catch (RuntimeException e) {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  void acquireOnce(String entityId, String ownerId) {
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(lockTableName)
+        .key(toKey(entityId))
+        .build());
+
+    if (!response.hasItem()) {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST)
+          .build());
+    } else {
+      Map<String, AttributeValue> currentItem = response.item();
+
+      try {
+        Thread.sleep(Long.parseLong(currentItem.get(COL_LEASE_DURATION_MS).n()));
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e);
+      }
+
+      Map<String, AttributeValue> expressionValues = Maps.newHashMap();
+      expressionValues.put(":eid", AttributeValue.builder().s(entityId).build());
+      expressionValues.put(":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build());
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH)
+          .expressionAttributeValues(expressionValues)
+          .build());
+    }
+
+    startNewHeartbeat(entityId, ownerId);
+  }
+
+  private void startNewHeartbeat(String entityId, String ownerId) {
+    if (heartbeats.containsKey(entityId)) {
+      heartbeats.remove(entityId).cancel(false);
+    }
+
+    heartbeats.put(entityId, scheduler().scheduleAtFixedRate(() -> dynamo.putItem(PutItemRequest.builder()
+        .tableName(lockTableName)
+        .item(toNewItem(entityId, ownerId))
+        .conditionExpression(CONDITION_LOCK_ID_MATCH)
+        .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+        .build()), 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
+  }
+
+  @Override
+  public boolean release(String entityId, String ownerId) {
+    ScheduledFuture<?> heartbeat = heartbeats.get(entityId);
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(lockTableName)
+          .key(toKey(entityId))
+          .conditionExpression(CONDITION_LOCK_ID_MATCH)
+          .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+          .build());

Review comment:
       This looks good. Nice catch with the owner check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#discussion_r552303020



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;
+
+/**
+ * DynamoDB implementation for the lock manager.
+ */
+class DynamoLockManager extends LockManagers.BaseLockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class);
+
+  private static final String COL_LOCK_ENTITY_ID = "entityId";
+  private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
+  private static final String COL_VERSION = "version";
+  private static final String COL_LOCK_OWNER_ID = "ownerId";
+
+  private static final String CONDITION_LOCK_ID_MATCH = String.format(
+      "%s = :eid AND %s = :oid",
+      COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format(
+      "attribute_not_exists(%s)",
+      COL_LOCK_ENTITY_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format(
+      "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)",
+      COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
+
+  private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+
+  private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList(
+      KeySchemaElement.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .keyType(KeyType.HASH)
+          .build()
+  );
+
+  private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList(
+      AttributeDefinition.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .attributeType(ScalarAttributeType.S)
+          .build()
+  );
+
+  private final Map<String, ScheduledFuture<?>> heartbeats = Maps.newHashMap();
+
+  private DynamoDbClient dynamo;
+  private String lockTableName;
+
+  /**
+   * constructor for dynamic initialization, {@link #initialize(Map)} must be called later.
+   */
+  DynamoLockManager() {
+  }
+
+  /**
+   * constructor used for testing purpose
+   * @param dynamo dynamo client
+   * @param lockTableName lock table name
+   */
+  DynamoLockManager(DynamoDbClient dynamo, String lockTableName) {
+    super.initialize(Maps.newHashMap());
+    this.dynamo = dynamo;
+    this.lockTableName = lockTableName;
+    ensureLockTableExistsOrCreate();
+  }
+
+  private void ensureLockTableExistsOrCreate() {
+
+    if (tableExists(lockTableName)) {
+      return;
+    }
+
+    LOG.info("Dynamo lock table {} not found, trying to create", lockTableName);
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(lockTableName)
+        .keySchema(lockTableSchema())
+        .attributeDefinitions(lockTableColDefinitions())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(lockTableName)
+        .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  @VisibleForTesting
+  boolean tableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
+    }
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    super.initialize(properties);
+    this.dynamo = AwsClientFactories.from(properties).dynamo();
+    this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE);
+    Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null");
+    ensureLockTableExistsOrCreate();
+  }
+
+  @Override
+  public boolean acquire(String entityId, String ownerId) {
+    try {
+      Tasks.foreach(entityId)
+          .throwFailureWhenFinished()
+          .retry(Integer.MAX_VALUE - 1)
+          .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1)
+          .onlyRetryOn(
+              ConditionalCheckFailedException.class,
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> acquireOnce(id, ownerId));
+      return true;
+    } catch (RuntimeException e) {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  void acquireOnce(String entityId, String ownerId) {
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(lockTableName)
+        .key(toKey(entityId))
+        .build());
+
+    if (!response.hasItem()) {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST)
+          .build());
+    } else {
+      Map<String, AttributeValue> currentItem = response.item();
+
+      try {
+        Thread.sleep(Long.parseLong(currentItem.get(COL_LEASE_DURATION_MS).n()));
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e);
+      }
+
+      Map<String, AttributeValue> expressionValues = Maps.newHashMap();
+      expressionValues.put(":eid", AttributeValue.builder().s(entityId).build());
+      expressionValues.put(":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build());
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH)
+          .expressionAttributeValues(expressionValues)
+          .build());
+    }
+
+    startNewHeartbeat(entityId, ownerId);
+  }
+
+  private void startNewHeartbeat(String entityId, String ownerId) {
+    if (heartbeats.containsKey(entityId)) {
+      heartbeats.remove(entityId).cancel(false);
+    }
+
+    heartbeats.put(entityId, scheduler().scheduleAtFixedRate(() -> dynamo.putItem(PutItemRequest.builder()
+        .tableName(lockTableName)
+        .item(toNewItem(entityId, ownerId))
+        .conditionExpression(CONDITION_LOCK_ID_MATCH)
+        .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+        .build()), 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
+  }
+
+  @Override
+  public boolean release(String entityId, String ownerId) {
+    ScheduledFuture<?> heartbeat = heartbeats.get(entityId);
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(lockTableName)
+          .key(toKey(entityId))
+          .conditionExpression(CONDITION_LOCK_ID_MATCH)
+          .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Fail to release lock {} by {}, lock entity does not exist or owner not match", entityId, ownerId, e);
+      return false;
+    }
+
+    if (heartbeat != null) {
+      heartbeat.cancel(false);
+    }
+
+    return true;
+  }
+
+  private Map<String, AttributeValue> toKey(String entityId) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_LOCK_ENTITY_ID, AttributeValue.builder().s(entityId).build());
+    return key;

Review comment:
       What about `ImmutableMap.of(COL_LOCK_ENTITY_ID, ...)`? This is probably a nit, but I tend to prefer immutable classes if possible, especially for small maps where a hash lookup might be avoided because there's just one element.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#discussion_r552392569



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;
+
+/**
+ * DynamoDB implementation for the lock manager.
+ */
+class DynamoLockManager extends LockManagers.BaseLockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class);
+
+  private static final String COL_LOCK_ENTITY_ID = "entityId";
+  private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
+  private static final String COL_VERSION = "version";
+  private static final String COL_LOCK_OWNER_ID = "ownerId";
+
+  private static final String CONDITION_LOCK_ID_MATCH = String.format(
+      "%s = :eid AND %s = :oid",
+      COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format(
+      "attribute_not_exists(%s)",
+      COL_LOCK_ENTITY_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format(
+      "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)",
+      COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
+
+  private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+
+  private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList(
+      KeySchemaElement.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .keyType(KeyType.HASH)
+          .build());
+
+  private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList(
+      AttributeDefinition.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .attributeType(ScalarAttributeType.S)
+          .build());
+
+  private final Map<String, ScheduledFuture<?>> heartbeats = Maps.newHashMap();
+
+  private DynamoDbClient dynamo;
+  private String lockTableName;
+
+  /**
+   * constructor for dynamic initialization, {@link #initialize(Map)} must be called later.
+   */
+  DynamoLockManager() {
+  }
+
+  /**
+   * constructor used for testing purpose
+   * @param dynamo dynamo client
+   * @param lockTableName lock table name
+   */
+  DynamoLockManager(DynamoDbClient dynamo, String lockTableName) {
+    super.initialize(Maps.newHashMap());
+    this.dynamo = dynamo;
+    this.lockTableName = lockTableName;
+    ensureLockTableExistsOrCreate();
+  }
+
+  private void ensureLockTableExistsOrCreate() {
+
+    if (tableExists(lockTableName)) {
+      return;
+    }
+
+    LOG.info("Dynamo lock table {} not found, trying to create", lockTableName);
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(lockTableName)
+        .keySchema(lockTableSchema())
+        .attributeDefinitions(lockTableColDefinitions())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(lockTableName)
+        .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  @VisibleForTesting
+  boolean tableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
+    }
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    super.initialize(properties);
+    this.dynamo = AwsClientFactories.from(properties).dynamo();
+    this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE);
+    Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null");
+    ensureLockTableExistsOrCreate();
+  }
+
+  @Override
+  public boolean acquire(String entityId, String ownerId) {
+    try {
+      Tasks.foreach(entityId)
+          .throwFailureWhenFinished()
+          .retry(Integer.MAX_VALUE - 1)
+          .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1)
+          .onlyRetryOn(
+              ConditionalCheckFailedException.class,
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> acquireOnce(id, ownerId));
+      return true;
+    } catch (DynamoDbException e) {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  void acquireOnce(String entityId, String ownerId) {
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(lockTableName)
+        .key(toKey(entityId))
+        .build());
+
+    if (!response.hasItem()) {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST)
+          .build());
+    } else {
+      Map<String, AttributeValue> currentItem = response.item();
+
+      try {
+        Thread.sleep(Long.parseLong(currentItem.get(COL_LEASE_DURATION_MS).n()));
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e);
+      }
+
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH)
+          .expressionAttributeValues(ImmutableMap.of(
+              ":eid", AttributeValue.builder().s(entityId).build(),
+              ":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build()))
+          .build());
+    }
+
+    startNewHeartbeat(entityId, ownerId);
+  }
+
+  private void startNewHeartbeat(String entityId, String ownerId) {
+    if (heartbeats.containsKey(entityId)) {
+      heartbeats.remove(entityId).cancel(false);
+    }
+
+    heartbeats.put(entityId, scheduler().scheduleAtFixedRate(() -> dynamo.putItem(PutItemRequest.builder()
+        .tableName(lockTableName)
+        .item(toNewItem(entityId, ownerId))
+        .conditionExpression(CONDITION_LOCK_ID_MATCH)
+        .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+        .build()), 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));

Review comment:
       Yes, that is definitely a risk. I also thought about what you described, but I think the only way to achieve that is to allow the lock manager caller to explicitly call a `heartbeat` method to emit heartbeat, and this method needs to be passed all the way to every place of a commit to ensure heartbeat does not timeout.
   
   So comparing with the current Hive lock, we are basically choosing between:
   1. if we lock without heartbeat, a super long timeout is needed to ensure the commit must succeed, and if lock dies, all processes would be blocked for that long time.
   2. if we lock with heartbeat, we don't need a long timeout. But we need to either choose to:
       1. heartbeat in the background, which makes the interface more decoupled, but would lead to this issue of potential idle heartbeat thread
       2. heartbeat by the caller, which would require changes the commit interface all over the place to pass the heartbeat around.
   
   I think so far 2.1 is still the best approach to go for now, since we have already made sure `release` must be called after a commit, so heartbeat will be killed. If it is not called, the process must have crashed and the heartbeat thread would also be dead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#issuecomment-755835203


   Looks great, thanks @jackye1995!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#discussion_r552388451



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;
+
+/**
+ * DynamoDB implementation for the lock manager.
+ */
+class DynamoLockManager extends LockManagers.BaseLockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class);
+
+  private static final String COL_LOCK_ENTITY_ID = "entityId";
+  private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
+  private static final String COL_VERSION = "version";
+  private static final String COL_LOCK_OWNER_ID = "ownerId";
+
+  private static final String CONDITION_LOCK_ID_MATCH = String.format(
+      "%s = :eid AND %s = :oid",
+      COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format(
+      "attribute_not_exists(%s)",
+      COL_LOCK_ENTITY_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format(
+      "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)",
+      COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
+
+  private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+
+  private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList(
+      KeySchemaElement.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .keyType(KeyType.HASH)
+          .build()
+  );
+
+  private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList(
+      AttributeDefinition.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .attributeType(ScalarAttributeType.S)
+          .build()
+  );
+
+  private final Map<String, ScheduledFuture<?>> heartbeats = Maps.newHashMap();
+
+  private DynamoDbClient dynamo;
+  private String lockTableName;
+
+  /**
+   * constructor for dynamic initialization, {@link #initialize(Map)} must be called later.
+   */
+  DynamoLockManager() {
+  }
+
+  /**
+   * constructor used for testing purpose
+   * @param dynamo dynamo client
+   * @param lockTableName lock table name
+   */
+  DynamoLockManager(DynamoDbClient dynamo, String lockTableName) {
+    super.initialize(Maps.newHashMap());
+    this.dynamo = dynamo;
+    this.lockTableName = lockTableName;
+    ensureLockTableExistsOrCreate();
+  }
+
+  private void ensureLockTableExistsOrCreate() {
+
+    if (tableExists(lockTableName)) {
+      return;
+    }
+
+    LOG.info("Dynamo lock table {} not found, trying to create", lockTableName);
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(lockTableName)
+        .keySchema(lockTableSchema())
+        .attributeDefinitions(lockTableColDefinitions())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(lockTableName)
+        .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  @VisibleForTesting
+  boolean tableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
+    }
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    super.initialize(properties);
+    this.dynamo = AwsClientFactories.from(properties).dynamo();
+    this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE);
+    Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null");
+    ensureLockTableExistsOrCreate();
+  }
+
+  @Override
+  public boolean acquire(String entityId, String ownerId) {
+    try {
+      Tasks.foreach(entityId)
+          .throwFailureWhenFinished()
+          .retry(Integer.MAX_VALUE - 1)
+          .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1)
+          .onlyRetryOn(
+              ConditionalCheckFailedException.class,
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> acquireOnce(id, ownerId));
+      return true;
+    } catch (RuntimeException e) {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  void acquireOnce(String entityId, String ownerId) {
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(lockTableName)
+        .key(toKey(entityId))
+        .build());
+
+    if (!response.hasItem()) {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST)
+          .build());
+    } else {
+      Map<String, AttributeValue> currentItem = response.item();
+
+      try {
+        Thread.sleep(Long.parseLong(currentItem.get(COL_LEASE_DURATION_MS).n()));
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e);
+      }
+
+      Map<String, AttributeValue> expressionValues = Maps.newHashMap();
+      expressionValues.put(":eid", AttributeValue.builder().s(entityId).build());
+      expressionValues.put(":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build());
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH)
+          .expressionAttributeValues(expressionValues)
+          .build());
+    }
+
+    startNewHeartbeat(entityId, ownerId);
+  }
+
+  private void startNewHeartbeat(String entityId, String ownerId) {
+    if (heartbeats.containsKey(entityId)) {
+      heartbeats.remove(entityId).cancel(false);
+    }
+
+    heartbeats.put(entityId, scheduler().scheduleAtFixedRate(() -> dynamo.putItem(PutItemRequest.builder()
+        .tableName(lockTableName)
+        .item(toNewItem(entityId, ownerId))
+        .conditionExpression(CONDITION_LOCK_ID_MATCH)
+        .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+        .build()), 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
+  }
+
+  @Override
+  public boolean release(String entityId, String ownerId) {
+    ScheduledFuture<?> heartbeat = heartbeats.get(entityId);
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(lockTableName)
+          .key(toKey(entityId))
+          .conditionExpression(CONDITION_LOCK_ID_MATCH)
+          .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+          .build());

Review comment:
       I think we cannot always cancel heartbeat in a final block. If the caller has a different owner, the heartbeat should not be canceled. Therefore, the correct behavior should be to check the owner of the lock when trying to release, so that:
   1. if release succeeds, then cancel heartbeat
   2. if release fails and heartbeat owner is the same, cancel heartbeat. There should be something wrong with Dynamo, and canceling the heartbeat should allow the lock to expire
   3. if release fails and heartbeat owner is different, the caller is wrong and we should not cancel the ongoing heartbeat.
   
   To achieve this, I have introduced an internal class `DynamoHeartbeat` to also save the owner in the `heartbeats` map.
   
   Also, I added retry in release using `Tasks` to retry on some retryable exceptions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#discussion_r552301353



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;
+
+/**
+ * DynamoDB implementation for the lock manager.
+ */
+class DynamoLockManager extends LockManagers.BaseLockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class);
+
+  private static final String COL_LOCK_ENTITY_ID = "entityId";
+  private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
+  private static final String COL_VERSION = "version";
+  private static final String COL_LOCK_OWNER_ID = "ownerId";
+
+  private static final String CONDITION_LOCK_ID_MATCH = String.format(
+      "%s = :eid AND %s = :oid",
+      COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format(
+      "attribute_not_exists(%s)",
+      COL_LOCK_ENTITY_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format(
+      "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)",
+      COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
+
+  private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+
+  private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList(
+      KeySchemaElement.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .keyType(KeyType.HASH)
+          .build()
+  );
+
+  private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList(
+      AttributeDefinition.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .attributeType(ScalarAttributeType.S)
+          .build()
+  );
+
+  private final Map<String, ScheduledFuture<?>> heartbeats = Maps.newHashMap();
+
+  private DynamoDbClient dynamo;
+  private String lockTableName;
+
+  /**
+   * constructor for dynamic initialization, {@link #initialize(Map)} must be called later.
+   */
+  DynamoLockManager() {
+  }
+
+  /**
+   * constructor used for testing purpose
+   * @param dynamo dynamo client
+   * @param lockTableName lock table name
+   */
+  DynamoLockManager(DynamoDbClient dynamo, String lockTableName) {
+    super.initialize(Maps.newHashMap());
+    this.dynamo = dynamo;
+    this.lockTableName = lockTableName;
+    ensureLockTableExistsOrCreate();
+  }
+
+  private void ensureLockTableExistsOrCreate() {
+
+    if (tableExists(lockTableName)) {
+      return;
+    }
+
+    LOG.info("Dynamo lock table {} not found, trying to create", lockTableName);
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(lockTableName)
+        .keySchema(lockTableSchema())
+        .attributeDefinitions(lockTableColDefinitions())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(lockTableName)
+        .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  @VisibleForTesting
+  boolean tableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
+    }
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    super.initialize(properties);
+    this.dynamo = AwsClientFactories.from(properties).dynamo();
+    this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE);
+    Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null");
+    ensureLockTableExistsOrCreate();
+  }
+
+  @Override
+  public boolean acquire(String entityId, String ownerId) {
+    try {
+      Tasks.foreach(entityId)
+          .throwFailureWhenFinished()
+          .retry(Integer.MAX_VALUE - 1)
+          .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1)
+          .onlyRetryOn(
+              ConditionalCheckFailedException.class,
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> acquireOnce(id, ownerId));
+      return true;
+    } catch (RuntimeException e) {

Review comment:
       `RuntimeException` seems too general to me because the exception is lost after this method returns. Is there another exception that would be more appropriate? Looks like `AmazonServiceException` or `AmazonDynamoDBException` are good options.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2034: AWS: add DynamoDB implementation of Glue lock manager

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2034:
URL: https://github.com/apache/iceberg/pull/2034#discussion_r552312545



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;
+
+/**
+ * DynamoDB implementation for the lock manager.
+ */
+class DynamoLockManager extends LockManagers.BaseLockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class);
+
+  private static final String COL_LOCK_ENTITY_ID = "entityId";
+  private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
+  private static final String COL_VERSION = "version";
+  private static final String COL_LOCK_OWNER_ID = "ownerId";
+
+  private static final String CONDITION_LOCK_ID_MATCH = String.format(
+      "%s = :eid AND %s = :oid",
+      COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format(
+      "attribute_not_exists(%s)",
+      COL_LOCK_ENTITY_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format(
+      "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)",
+      COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
+
+  private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+
+  private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList(
+      KeySchemaElement.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .keyType(KeyType.HASH)
+          .build()
+  );
+
+  private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList(
+      AttributeDefinition.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .attributeType(ScalarAttributeType.S)
+          .build()
+  );
+
+  private final Map<String, ScheduledFuture<?>> heartbeats = Maps.newHashMap();
+
+  private DynamoDbClient dynamo;
+  private String lockTableName;
+
+  /**
+   * constructor for dynamic initialization, {@link #initialize(Map)} must be called later.
+   */
+  DynamoLockManager() {
+  }
+
+  /**
+   * constructor used for testing purpose
+   * @param dynamo dynamo client
+   * @param lockTableName lock table name
+   */
+  DynamoLockManager(DynamoDbClient dynamo, String lockTableName) {
+    super.initialize(Maps.newHashMap());
+    this.dynamo = dynamo;
+    this.lockTableName = lockTableName;
+    ensureLockTableExistsOrCreate();
+  }
+
+  private void ensureLockTableExistsOrCreate() {
+
+    if (tableExists(lockTableName)) {
+      return;
+    }
+
+    LOG.info("Dynamo lock table {} not found, trying to create", lockTableName);
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(lockTableName)
+        .keySchema(lockTableSchema())
+        .attributeDefinitions(lockTableColDefinitions())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(lockTableName)
+        .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  @VisibleForTesting
+  boolean tableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
+    }
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    super.initialize(properties);
+    this.dynamo = AwsClientFactories.from(properties).dynamo();
+    this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE);
+    Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null");
+    ensureLockTableExistsOrCreate();
+  }
+
+  @Override
+  public boolean acquire(String entityId, String ownerId) {
+    try {
+      Tasks.foreach(entityId)
+          .throwFailureWhenFinished()
+          .retry(Integer.MAX_VALUE - 1)
+          .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1)
+          .onlyRetryOn(
+              ConditionalCheckFailedException.class,
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> acquireOnce(id, ownerId));
+      return true;
+    } catch (RuntimeException e) {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  void acquireOnce(String entityId, String ownerId) {
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(lockTableName)
+        .key(toKey(entityId))
+        .build());
+
+    if (!response.hasItem()) {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST)
+          .build());
+    } else {
+      Map<String, AttributeValue> currentItem = response.item();
+
+      try {
+        Thread.sleep(Long.parseLong(currentItem.get(COL_LEASE_DURATION_MS).n()));
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e);
+      }
+
+      Map<String, AttributeValue> expressionValues = Maps.newHashMap();
+      expressionValues.put(":eid", AttributeValue.builder().s(entityId).build());
+      expressionValues.put(":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build());
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH)
+          .expressionAttributeValues(expressionValues)
+          .build());
+    }
+
+    startNewHeartbeat(entityId, ownerId);
+  }
+
+  private void startNewHeartbeat(String entityId, String ownerId) {
+    if (heartbeats.containsKey(entityId)) {
+      heartbeats.remove(entityId).cancel(false);
+    }
+
+    heartbeats.put(entityId, scheduler().scheduleAtFixedRate(() -> dynamo.putItem(PutItemRequest.builder()
+        .tableName(lockTableName)
+        .item(toNewItem(entityId, ownerId))
+        .conditionExpression(CONDITION_LOCK_ID_MATCH)
+        .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+        .build()), 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
+  }
+
+  @Override
+  public boolean release(String entityId, String ownerId) {
+    ScheduledFuture<?> heartbeat = heartbeats.get(entityId);
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(lockTableName)
+          .key(toKey(entityId))
+          .conditionExpression(CONDITION_LOCK_ID_MATCH)
+          .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Fail to release lock {} by {}, lock entity does not exist or owner not match", entityId, ownerId, e);
+      return false;
+    }
+
+    if (heartbeat != null) {
+      heartbeat.cancel(false);
+    }
+
+    return true;
+  }
+
+  private Map<String, AttributeValue> toKey(String entityId) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_LOCK_ENTITY_ID, AttributeValue.builder().s(entityId).build());
+    return key;

Review comment:
       FIxed, also updated other similar usages.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;
+
+/**
+ * DynamoDB implementation for the lock manager.
+ */
+class DynamoLockManager extends LockManagers.BaseLockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class);
+
+  private static final String COL_LOCK_ENTITY_ID = "entityId";
+  private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
+  private static final String COL_VERSION = "version";
+  private static final String COL_LOCK_OWNER_ID = "ownerId";
+
+  private static final String CONDITION_LOCK_ID_MATCH = String.format(
+      "%s = :eid AND %s = :oid",
+      COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format(
+      "attribute_not_exists(%s)",
+      COL_LOCK_ENTITY_ID);
+  private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format(
+      "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)",
+      COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
+
+  private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+
+  private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList(
+      KeySchemaElement.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .keyType(KeyType.HASH)
+          .build()
+  );
+
+  private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList(
+      AttributeDefinition.builder()
+          .attributeName(COL_LOCK_ENTITY_ID)
+          .attributeType(ScalarAttributeType.S)
+          .build()
+  );
+
+  private final Map<String, ScheduledFuture<?>> heartbeats = Maps.newHashMap();
+
+  private DynamoDbClient dynamo;
+  private String lockTableName;
+
+  /**
+   * constructor for dynamic initialization, {@link #initialize(Map)} must be called later.
+   */
+  DynamoLockManager() {
+  }
+
+  /**
+   * constructor used for testing purpose
+   * @param dynamo dynamo client
+   * @param lockTableName lock table name
+   */
+  DynamoLockManager(DynamoDbClient dynamo, String lockTableName) {
+    super.initialize(Maps.newHashMap());
+    this.dynamo = dynamo;
+    this.lockTableName = lockTableName;
+    ensureLockTableExistsOrCreate();
+  }
+
+  private void ensureLockTableExistsOrCreate() {
+
+    if (tableExists(lockTableName)) {
+      return;
+    }
+
+    LOG.info("Dynamo lock table {} not found, trying to create", lockTableName);
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(lockTableName)
+        .keySchema(lockTableSchema())
+        .attributeDefinitions(lockTableColDefinitions())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(lockTableName)
+        .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  @VisibleForTesting
+  boolean tableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
+    }
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    super.initialize(properties);
+    this.dynamo = AwsClientFactories.from(properties).dynamo();
+    this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE);
+    Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null");
+    ensureLockTableExistsOrCreate();
+  }
+
+  @Override
+  public boolean acquire(String entityId, String ownerId) {
+    try {
+      Tasks.foreach(entityId)
+          .throwFailureWhenFinished()
+          .retry(Integer.MAX_VALUE - 1)
+          .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1)
+          .onlyRetryOn(
+              ConditionalCheckFailedException.class,
+              ProvisionedThroughputExceededException.class,
+              TransactionConflictException.class,
+              RequestLimitExceededException.class,
+              InternalServerErrorException.class)
+          .run(id -> acquireOnce(id, ownerId));
+      return true;
+    } catch (RuntimeException e) {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  void acquireOnce(String entityId, String ownerId) {
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(lockTableName)
+        .key(toKey(entityId))
+        .build());
+
+    if (!response.hasItem()) {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST)
+          .build());
+    } else {
+      Map<String, AttributeValue> currentItem = response.item();
+
+      try {
+        Thread.sleep(Long.parseLong(currentItem.get(COL_LEASE_DURATION_MS).n()));
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e);
+      }
+
+      Map<String, AttributeValue> expressionValues = Maps.newHashMap();
+      expressionValues.put(":eid", AttributeValue.builder().s(entityId).build());
+      expressionValues.put(":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build());
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(lockTableName)
+          .item(toNewItem(entityId, ownerId))
+          .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH)
+          .expressionAttributeValues(expressionValues)
+          .build());
+    }
+
+    startNewHeartbeat(entityId, ownerId);
+  }
+
+  private void startNewHeartbeat(String entityId, String ownerId) {
+    if (heartbeats.containsKey(entityId)) {
+      heartbeats.remove(entityId).cancel(false);
+    }
+
+    heartbeats.put(entityId, scheduler().scheduleAtFixedRate(() -> dynamo.putItem(PutItemRequest.builder()
+        .tableName(lockTableName)
+        .item(toNewItem(entityId, ownerId))
+        .conditionExpression(CONDITION_LOCK_ID_MATCH)
+        .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+        .build()), 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
+  }
+
+  @Override
+  public boolean release(String entityId, String ownerId) {
+    ScheduledFuture<?> heartbeat = heartbeats.get(entityId);
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(lockTableName)
+          .key(toKey(entityId))
+          .conditionExpression(CONDITION_LOCK_ID_MATCH)
+          .expressionAttributeValues(toLockIdValues(entityId, ownerId))
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Fail to release lock {} by {}, lock entity does not exist or owner not match", entityId, ownerId, e);
+      return false;
+    }
+
+    if (heartbeat != null) {
+      heartbeat.cancel(false);
+    }
+
+    return true;
+  }
+
+  private Map<String, AttributeValue> toKey(String entityId) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_LOCK_ENTITY_ID, AttributeValue.builder().s(entityId).build());
+    return key;

Review comment:
       Fixed, also updated other similar usages.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org