You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "amogh-jahagirdar (via GitHub)" <gi...@apache.org> on 2023/04/04 05:35:19 UTC

[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7198: AWS: Check commit status after failed commit if AWS client performed retries

amogh-jahagirdar commented on code in PR #7198:
URL: https://github.com/apache/iceberg/pull/7198#discussion_r1156739798


##########
aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java:
##########
@@ -116,20 +118,30 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
                   .build());
       checkMetadataLocation(table, base);
       Map<String, String> properties = prepareProperties(table, newMetadataLocation);
-      persistTable(tableKey, table, properties);
+      persistTable(tableKey, table, properties, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
-    } catch (ConditionalCheckFailedException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s: concurrent update detected", tableName());
     } catch (CommitFailedException e) {
       // any explicit commit failures are passed up and out to the retry handler
       throw e;
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
-          fullTableName,
-          persistFailure);
-      commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      boolean conditionCheckFailed = persistFailure instanceof ConditionalCheckFailedException;
+
+      // If we got an exception we weren't expecting, or we got a ConditionalCheckFailedException
+      // but retries were performed, attempt to reconcile the actual commit status.
+      if (!conditionCheckFailed || retryDetector.retried()) {
+        LOG.error(
+            "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
+            fullTableName,
+            persistFailure);

Review Comment:
   Should this be at a `warn` level instead? If the commit did end up being successful, then it may mislead people if it's at error level. also nit on the message, maybe "Received unexpected failure when committing, validating if commit ended up succeeding"



##########
aws/src/main/java/org/apache/iceberg/aws/util/RetryDetector.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+
+/**
+ * Metrics are the only reliable way provided by the AWS SDK to determine if an API call was
+ * retried. This class can be attached to an AWS API call and checked after to determine if retries
+ * occurred.
+ */
+public class RetryDetector implements MetricPublisher {

Review Comment:
   Could we avoid making this public for now? Since it's used within the `TableOperations` implementation we should be able to make it package private. At a later point, if there's a desire to expose this outside of the AWS module we can make it public



##########
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java:
##########
@@ -154,47 +156,29 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
       Table glueTable = getGlueTable();
       checkMetadataLocation(glueTable, base);
       Map<String, String> properties = prepareProperties(glueTable, newMetadataLocation);
-      persistGlueTable(glueTable, properties, metadata);
+      persistGlueTable(glueTable, properties, metadata, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
     } catch (CommitFailedException e) {
       throw e;
-    } catch (ConcurrentModificationException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s because Glue detected concurrent update", tableName());
-    } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) {
-      throw new AlreadyExistsException(
-          e,
-          "Cannot commit %s because its Glue table already exists when trying to create one",
-          tableName());
-    } catch (EntityNotFoundException e) {
-      throw new NotFoundException(
-          e, "Cannot commit %s because Glue cannot find the requested entity", tableName());
-    } catch (AccessDeniedException e) {
-      throw new ForbiddenException(
-          e, "Cannot commit %s because Glue cannot access the requested resources", tableName());
-    } catch (software.amazon.awssdk.services.glue.model.ValidationException e) {
-      throw new ValidationException(
-          e,
-          "Cannot commit %s because Glue encountered a validation exception "
-              + "while accessing requested resources",
-          tableName());
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
-          fullTableName,
-          persistFailure);
+      boolean isAwsServiceException = persistFailure instanceof AwsServiceException;
 
-      if (persistFailure instanceof AwsServiceException) {
-        int statusCode = ((AwsServiceException) persistFailure).statusCode();
-        if (statusCode >= 500 && statusCode < 600) {
-          commitStatus = CommitStatus.FAILURE;
-        } else {
-          throw persistFailure;
-        }
-      } else {
+      // If we got an exception we weren't expecting, or we got an AWS service exception
+      // but retries were performed, attempt to reconcile the actual commit status.
+      if (!isAwsServiceException || retryDetector.retried()) {
+        LOG.error(

Review Comment:
   Same as above on warn level and message



##########
aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java:
##########
@@ -116,20 +118,30 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
                   .build());
       checkMetadataLocation(table, base);
       Map<String, String> properties = prepareProperties(table, newMetadataLocation);
-      persistTable(tableKey, table, properties);
+      persistTable(tableKey, table, properties, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
-    } catch (ConditionalCheckFailedException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s: concurrent update detected", tableName());
     } catch (CommitFailedException e) {
       // any explicit commit failures are passed up and out to the retry handler
       throw e;
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
-          fullTableName,
-          persistFailure);
-      commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      boolean conditionCheckFailed = persistFailure instanceof ConditionalCheckFailedException;
+
+      // If we got an exception we weren't expecting, or we got a ConditionalCheckFailedException
+      // but retries were performed, attempt to reconcile the actual commit status.
+      if (!conditionCheckFailed || retryDetector.retried()) {
+        LOG.error(
+            "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
+            fullTableName,
+            persistFailure);
+        commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      }
+
+      // If we got ConditionalCheckFailedException, but find we
+      // succeeded on a retry that threw an exception, skip this exception.
+      if (commitStatus != CommitStatus.SUCCESS && conditionCheckFailed) {
+        throw new CommitFailedException(
+            persistFailure, "Cannot commit %s: concurrent update detected", tableName());
+      }

Review Comment:
   Shouldn't we still throw if the commit status is unknown?



##########
aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java:
##########
@@ -116,20 +118,30 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
                   .build());
       checkMetadataLocation(table, base);
       Map<String, String> properties = prepareProperties(table, newMetadataLocation);
-      persistTable(tableKey, table, properties);
+      persistTable(tableKey, table, properties, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
-    } catch (ConditionalCheckFailedException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s: concurrent update detected", tableName());
     } catch (CommitFailedException e) {
       // any explicit commit failures are passed up and out to the retry handler
       throw e;
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
-          fullTableName,
-          persistFailure);
-      commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      boolean conditionCheckFailed = persistFailure instanceof ConditionalCheckFailedException;
+
+      // If we got an exception we weren't expecting, or we got a ConditionalCheckFailedException
+      // but retries were performed, attempt to reconcile the actual commit status.
+      if (!conditionCheckFailed || retryDetector.retried()) {
+        LOG.error(
+            "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
+            fullTableName,
+            persistFailure);
+        commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      }
+
+      // If we got ConditionalCheckFailedException, but find we
+      // succeeded on a retry that threw an exception, skip this exception.

Review Comment:
   Nit: I get what the inline comment is going for and it's true, but I would probably just leave this out since it doesn't directly correspond with the block below. Or a comment could be something like "Only throw after checking the commit status  with consideration to retries, and if the condition check failed". 
   



##########
aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java:
##########
@@ -116,20 +118,30 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
                   .build());
       checkMetadataLocation(table, base);
       Map<String, String> properties = prepareProperties(table, newMetadataLocation);
-      persistTable(tableKey, table, properties);
+      persistTable(tableKey, table, properties, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
-    } catch (ConditionalCheckFailedException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s: concurrent update detected", tableName());
     } catch (CommitFailedException e) {
       // any explicit commit failures are passed up and out to the retry handler
       throw e;
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
-          fullTableName,
-          persistFailure);
-      commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      boolean conditionCheckFailed = persistFailure instanceof ConditionalCheckFailedException;
+
+      // If we got an exception we weren't expecting, or we got a ConditionalCheckFailedException
+      // but retries were performed, attempt to reconcile the actual commit status.
+      if (!conditionCheckFailed || retryDetector.retried()) {
+        LOG.error(
+            "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
+            fullTableName,
+            persistFailure);
+        commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      }
+
+      // If we got ConditionalCheckFailedException, but find we
+      // succeeded on a retry that threw an exception, skip this exception.
+      if (commitStatus != CommitStatus.SUCCESS && conditionCheckFailed) {
+        throw new CommitFailedException(
+            persistFailure, "Cannot commit %s: concurrent update detected", tableName());
+      }

Review Comment:
   Ah nvm that's still done below okay cool! 



##########
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java:
##########
@@ -340,6 +330,39 @@ void persistGlueTable(Table glueTable, Map<String, String> parameters, TableMeta
     }
   }
 
+  private void handleAWSExceptions(AwsServiceException persistFailure) {
+    int statusCode = persistFailure.statusCode();

Review Comment:
   Very minor nit: Generally for better readability I try and having variables as close as possible to where they are read. I think we can move the status code to the else block or have some separate helper method `shouldThrow(AwsServiceexception)` which just returns based on the critera



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org