You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/10 21:04:18 UTC

[iceberg] branch master updated: AWS: Check commit status after failed commit if AWS client performed retries (#7198)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f3707bc7f AWS: Check commit status after failed commit if AWS client performed retries (#7198)
9f3707bc7f is described below

commit 9f3707bc7fd133735b83c245a6545a83521e4e02
Author: Christina Larsen <ch...@gmail.com>
AuthorDate: Mon Apr 10 14:04:11 2023 -0700

    AWS: Check commit status after failed commit if AWS client performed retries (#7198)
---
 .../org/apache/iceberg/aws/glue/GlueTestBase.java  |  11 +-
 .../aws/glue/TestGlueCatalogCommitFailure.java     |  87 +++++++++++-
 .../aws/dynamodb/DynamoDbTableOperations.java      |  35 +++--
 .../iceberg/aws/glue/GlueTableOperations.java      |  93 +++++++-----
 .../org/apache/iceberg/aws/util/RetryDetector.java |  51 +++++++
 .../apache/iceberg/aws/util/TestRetryDetector.java | 156 +++++++++++++++++++++
 6 files changed, 375 insertions(+), 58 deletions(-)

diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
index c0623da796..122deda1a7 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
@@ -34,7 +34,6 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.LockManagers;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
@@ -83,13 +82,7 @@ public class GlueTestBase {
     AwsProperties properties = new AwsProperties();
     properties.setS3FileIoDeleteBatchSize(10);
     glueCatalog.initialize(
-        catalogName,
-        testBucketPath,
-        properties,
-        glue,
-        LockManagers.defaultLockManager(),
-        fileIO,
-        ImmutableMap.of());
+        catalogName, testBucketPath, properties, glue, null, fileIO, ImmutableMap.of());
 
     glueCatalogWithSkipNameValidation = new GlueCatalog();
     AwsProperties propertiesSkipNameValidation = new AwsProperties();
@@ -99,7 +92,7 @@ public class GlueTestBase {
         testBucketPath,
         propertiesSkipNameValidation,
         glue,
-        LockManagers.defaultLockManager(),
+        null,
         fileIO,
         ImmutableMap.of());
   }
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java
index 66c87be242..ea8316fb56 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java
@@ -26,15 +26,19 @@ import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.aws.s3.S3TestUtil;
+import org.apache.iceberg.aws.util.RetryDetector;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.metrics.MetricCollector;
 import software.amazon.awssdk.services.glue.model.AccessDeniedException;
 import software.amazon.awssdk.services.glue.model.ConcurrentModificationException;
 import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
@@ -123,6 +127,77 @@ public class TestGlueCatalogCommitFailure extends GlueTestBase {
     Assert.assertEquals("No new metadata files should exist", 2, metadataFileCount(ops.current()));
   }
 
+  @Test
+  public void testCheckCommitStatusAfterRetries() {
+    String namespace = createNamespace();
+    String tableName = createTable(namespace);
+    TableIdentifier tableId = TableIdentifier.of(namespace, tableName);
+
+    GlueTableOperations spyOps =
+        Mockito.spy((GlueTableOperations) glueCatalog.newTableOps(tableId));
+    GlueCatalog spyCatalog = Mockito.spy(glueCatalog);
+    Mockito.doReturn(spyOps).when(spyCatalog).newTableOps(Mockito.eq(tableId));
+    Table table = spyCatalog.loadTable(tableId);
+
+    TableMetadata metadataV1 = spyOps.current();
+    simulateRetriedCommit(spyOps, true /* report retry */);
+    updateTable(table, spyOps);
+
+    Assert.assertNotEquals("Current metadata should have changed", metadataV1, spyOps.current());
+    Assert.assertTrue("Current metadata should still exist", metadataFileExists(spyOps.current()));
+    Assert.assertEquals(
+        "No new metadata files should exist", 2, metadataFileCount(spyOps.current()));
+  }
+
+  @Test
+  public void testNoRetryAwarenessCorruptsTable() {
+    // This test exists to replicate the issue the prior test validates the fix for
+    // See https://github.com/apache/iceberg/issues/7151
+    String namespace = createNamespace();
+    String tableName = createTable(namespace);
+    TableIdentifier tableId = TableIdentifier.of(namespace, tableName);
+
+    GlueTableOperations spyOps =
+        Mockito.spy((GlueTableOperations) glueCatalog.newTableOps(tableId));
+    GlueCatalog spyCatalog = Mockito.spy(glueCatalog);
+    Mockito.doReturn(spyOps).when(spyCatalog).newTableOps(Mockito.eq(tableId));
+    Table table = spyCatalog.loadTable(tableId);
+
+    // Its possible that Glue or DynamoDB might someday make changes that render the retry detection
+    // mechanism unnecessary. At that time, this test would start failing while the prior one would
+    // still work. If or when that happens, we can re-evaluate whether the mechanism is still
+    // necessary.
+    simulateRetriedCommit(spyOps, false /* hide retry */);
+    Assertions.assertThatThrownBy(() -> updateTable(table, spyOps))
+        .as("Hidden retry causes writer to conflict with itself")
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("Glue detected concurrent update")
+        .cause()
+        .isInstanceOf(ConcurrentModificationException.class);
+
+    Assertions.assertThatThrownBy(() -> glueCatalog.loadTable(tableId))
+        .as("Table still accessible despite hidden retry, underlying assumptions may have changed")
+        .isInstanceOf(NotFoundException.class)
+        .hasMessageContaining("Location does not exist");
+  }
+
+  private void simulateRetriedCommit(GlueTableOperations spyOps, boolean reportRetry) {
+    // Perform a successful commit, then call it again, optionally letting the retryDetector know
+    // about it
+    Mockito.doAnswer(
+            i -> {
+              final MetricCollector metrics = MetricCollector.create("test");
+              metrics.reportMetric(CoreMetric.RETRY_COUNT, reportRetry ? 1 : 0);
+
+              i.callRealMethod();
+              i.getArgument(3, RetryDetector.class).publish(metrics.collect());
+              i.callRealMethod();
+              return null;
+            })
+        .when(spyOps)
+        .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any(), Mockito.any());
+  }
+
   @Test
   public void testCommitThrowsExceptionWhileSucceeded() {
     Table table = setupTable();
@@ -253,7 +328,8 @@ public class TestGlueCatalogCommitFailure extends GlueTestBase {
               realOps.persistGlueTable(
                   i.getArgument(0, software.amazon.awssdk.services.glue.model.Table.class),
                   mapProperties,
-                  i.getArgument(2, TableMetadata.class));
+                  i.getArgument(2, TableMetadata.class),
+                  i.getArgument(3, RetryDetector.class));
 
               // new metadata location is stored in map property, and used for locking
               String newMetadataLocation =
@@ -267,7 +343,7 @@ public class TestGlueCatalogCommitFailure extends GlueTestBase {
               throw new RuntimeException("Datacenter on fire");
             })
         .when(spyOperations)
-        .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any());
+        .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any(), Mockito.any());
   }
 
   @Test
@@ -422,11 +498,12 @@ public class TestGlueCatalogCommitFailure extends GlueTestBase {
               realOps.persistGlueTable(
                   i.getArgument(0, software.amazon.awssdk.services.glue.model.Table.class),
                   i.getArgument(1, Map.class),
-                  i.getArgument(2, TableMetadata.class));
+                  i.getArgument(2, TableMetadata.class),
+                  i.getArgument(3, RetryDetector.class));
               throw new RuntimeException("Datacenter on fire");
             })
         .when(spyOps)
-        .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any());
+        .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any(), Mockito.any());
   }
 
   private void failCommitAndThrowException(GlueTableOperations spyOps) {
@@ -436,7 +513,7 @@ public class TestGlueCatalogCommitFailure extends GlueTestBase {
   private void failCommitAndThrowException(GlueTableOperations spyOps, Exception exceptionToThrow) {
     Mockito.doThrow(exceptionToThrow)
         .when(spyOps)
-        .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any());
+        .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any(), Mockito.any());
   }
 
   private void breakFallbackCatalogCommitCheck(GlueTableOperations spyOperations) {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java
index e38aa0a867..a1a330b118 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.util.RetryDetector;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
@@ -105,6 +106,7 @@ class DynamoDbTableOperations extends BaseMetastoreTableOperations {
     boolean newTable = base == null;
     String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata);
     CommitStatus commitStatus = CommitStatus.FAILURE;
+    RetryDetector retryDetector = new RetryDetector();
     Map<String, AttributeValue> tableKey = DynamoDbCatalog.tablePrimaryKey(tableIdentifier);
     try {
       GetItemResponse table =
@@ -116,20 +118,28 @@ class DynamoDbTableOperations extends BaseMetastoreTableOperations {
                   .build());
       checkMetadataLocation(table, base);
       Map<String, String> properties = prepareProperties(table, newMetadataLocation);
-      persistTable(tableKey, table, properties);
+      persistTable(tableKey, table, properties, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
-    } catch (ConditionalCheckFailedException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s: concurrent update detected", tableName());
     } catch (CommitFailedException e) {
       // any explicit commit failures are passed up and out to the retry handler
       throw e;
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
-          fullTableName,
-          persistFailure);
-      commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      boolean conditionCheckFailed = persistFailure instanceof ConditionalCheckFailedException;
+
+      // If we got an exception we weren't expecting, or we got a ConditionalCheckFailedException
+      // but retries were performed, attempt to reconcile the actual commit status.
+      if (!conditionCheckFailed || retryDetector.retried()) {
+        LOG.warn(
+            "Received unexpected failure when committing to {}, validating if commit ended up succeeding.",
+            fullTableName,
+            persistFailure);
+        commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      }
+
+      if (commitStatus != CommitStatus.SUCCESS && conditionCheckFailed) {
+        throw new CommitFailedException(
+            persistFailure, "Cannot commit %s: concurrent update detected", tableName());
+      }
 
       switch (commitStatus) {
         case SUCCESS:
@@ -188,7 +198,10 @@ class DynamoDbTableOperations extends BaseMetastoreTableOperations {
   }
 
   void persistTable(
-      Map<String, AttributeValue> tableKey, GetItemResponse table, Map<String, String> parameters) {
+      Map<String, AttributeValue> tableKey,
+      GetItemResponse table,
+      Map<String, String> parameters,
+      RetryDetector retryDetector) {
     if (table.hasItem()) {
       LOG.debug("Committing existing DynamoDb catalog table: {}", tableName());
       List<String> updateParts = Lists.newArrayList();
@@ -209,6 +222,7 @@ class DynamoDbTableOperations extends BaseMetastoreTableOperations {
       attributeValues.put(":v", table.item().get(DynamoDbCatalog.COL_VERSION));
       dynamo.updateItem(
           UpdateItemRequest.builder()
+              .overrideConfiguration(c -> c.addMetricPublisher(retryDetector))
               .tableName(awsProperties.dynamoDbTableName())
               .key(tableKey)
               .conditionExpression(DynamoDbCatalog.COL_VERSION + " = :v")
@@ -226,6 +240,7 @@ class DynamoDbTableOperations extends BaseMetastoreTableOperations {
 
       dynamo.putItem(
           PutItemRequest.builder()
+              .overrideConfiguration(c -> c.addMetricPublisher(retryDetector))
               .tableName(awsProperties.dynamoDbTableName())
               .item(values)
               .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
index 84887cf253..6673d50c97 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.LockManager;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.aws.util.RetryDetector;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
@@ -142,6 +143,7 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
   @Override
   protected void doCommit(TableMetadata base, TableMetadata metadata) {
     CommitStatus commitStatus = CommitStatus.FAILURE;
+    RetryDetector retryDetector = new RetryDetector();
 
     String newMetadataLocation = null;
     boolean glueTempTableCreated = false;
@@ -154,47 +156,29 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
       Table glueTable = getGlueTable();
       checkMetadataLocation(glueTable, base);
       Map<String, String> properties = prepareProperties(glueTable, newMetadataLocation);
-      persistGlueTable(glueTable, properties, metadata);
+      persistGlueTable(glueTable, properties, metadata, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
     } catch (CommitFailedException e) {
       throw e;
-    } catch (ConcurrentModificationException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s because Glue detected concurrent update", tableName());
-    } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) {
-      throw new AlreadyExistsException(
-          e,
-          "Cannot commit %s because its Glue table already exists when trying to create one",
-          tableName());
-    } catch (EntityNotFoundException e) {
-      throw new NotFoundException(
-          e, "Cannot commit %s because Glue cannot find the requested entity", tableName());
-    } catch (AccessDeniedException e) {
-      throw new ForbiddenException(
-          e, "Cannot commit %s because Glue cannot access the requested resources", tableName());
-    } catch (software.amazon.awssdk.services.glue.model.ValidationException e) {
-      throw new ValidationException(
-          e,
-          "Cannot commit %s because Glue encountered a validation exception "
-              + "while accessing requested resources",
-          tableName());
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
-          fullTableName,
-          persistFailure);
+      boolean isAwsServiceException = persistFailure instanceof AwsServiceException;
 
-      if (persistFailure instanceof AwsServiceException) {
-        int statusCode = ((AwsServiceException) persistFailure).statusCode();
-        if (statusCode >= 500 && statusCode < 600) {
-          commitStatus = CommitStatus.FAILURE;
-        } else {
-          throw persistFailure;
-        }
-      } else {
+      // If we got an exception we weren't expecting, or we got an AWS service exception
+      // but retries were performed, attempt to reconcile the actual commit status.
+      if (!isAwsServiceException || retryDetector.retried()) {
+        LOG.warn(
+            "Received unexpected failure when committing to {}, validating if commit ended up succeeding.",
+            fullTableName,
+            persistFailure);
         commitStatus = checkCommitStatus(newMetadataLocation, metadata);
       }
 
+      // If we got an AWS exception we would usually handle, but find we
+      // succeeded on a retry that threw an exception, skip the exception.
+      if (commitStatus != CommitStatus.SUCCESS && isAwsServiceException) {
+        handleAWSExceptions((AwsServiceException) persistFailure);
+      }
+
       switch (commitStatus) {
         case SUCCESS:
           break;
@@ -298,11 +282,16 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
   }
 
   @VisibleForTesting
-  void persistGlueTable(Table glueTable, Map<String, String> parameters, TableMetadata metadata) {
+  void persistGlueTable(
+      Table glueTable,
+      Map<String, String> parameters,
+      TableMetadata metadata,
+      RetryDetector retryDetector) {
     if (glueTable != null) {
       LOG.debug("Committing existing Glue table: {}", tableName());
       UpdateTableRequest.Builder updateTableRequest =
           UpdateTableRequest.builder()
+              .overrideConfiguration(c -> c.addMetricPublisher(retryDetector))
               .catalogId(awsProperties.glueCatalogId())
               .databaseName(databaseName)
               .skipArchive(awsProperties.glueCatalogSkipArchive())
@@ -325,6 +314,7 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
       LOG.debug("Committing new Glue table: {}", tableName());
       glue.createTable(
           CreateTableRequest.builder()
+              .overrideConfiguration(c -> c.addMetricPublisher(retryDetector))
               .catalogId(awsProperties.glueCatalogId())
               .databaseName(databaseName)
               .tableInput(
@@ -340,6 +330,41 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
     }
   }
 
+  private void handleAWSExceptions(AwsServiceException persistFailure) {
+    if (persistFailure instanceof ConcurrentModificationException) {
+      throw new CommitFailedException(
+          persistFailure, "Cannot commit %s because Glue detected concurrent update", tableName());
+    } else if (persistFailure
+        instanceof software.amazon.awssdk.services.glue.model.AlreadyExistsException) {
+      throw new AlreadyExistsException(
+          persistFailure,
+          "Cannot commit %s because its Glue table already exists when trying to create one",
+          tableName());
+    } else if (persistFailure instanceof EntityNotFoundException) {
+      throw new NotFoundException(
+          persistFailure,
+          "Cannot commit %s because Glue cannot find the requested entity",
+          tableName());
+    } else if (persistFailure instanceof AccessDeniedException) {
+      throw new ForbiddenException(
+          persistFailure,
+          "Cannot commit %s because Glue cannot access the requested resources",
+          tableName());
+    } else if (persistFailure
+        instanceof software.amazon.awssdk.services.glue.model.ValidationException) {
+      throw new ValidationException(
+          persistFailure,
+          "Cannot commit %s because Glue encountered a validation exception "
+              + "while accessing requested resources",
+          tableName());
+    } else {
+      int statusCode = persistFailure.statusCode();
+      if (statusCode < 500 || statusCode >= 600) {
+        throw persistFailure;
+      }
+    }
+  }
+
   @VisibleForTesting
   void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation) {
     try {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/util/RetryDetector.java b/aws/src/main/java/org/apache/iceberg/aws/util/RetryDetector.java
new file mode 100644
index 0000000000..a5df36a2c8
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/util/RetryDetector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+
+/**
+ * Metrics are the only reliable way provided by the AWS SDK to determine if an API call was
+ * retried. This class can be attached to an AWS API call and checked after to determine if retries
+ * occurred.
+ */
+public class RetryDetector implements MetricPublisher {
+  private final AtomicBoolean retried = new AtomicBoolean(false);
+
+  @Override
+  public void publish(MetricCollection metricCollection) {
+    if (!retried.get()) {
+      if (metricCollection.metricValues(CoreMetric.RETRY_COUNT).stream().anyMatch(i -> i > 0)) {
+        retried.set(true);
+      } else {
+        metricCollection.children().forEach(this::publish);
+      }
+    }
+  }
+
+  @Override
+  public void close() {}
+
+  public boolean retried() {
+    return retried.get();
+  }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/util/TestRetryDetector.java b/aws/src/test/java/org/apache/iceberg/aws/util/TestRetryDetector.java
new file mode 100644
index 0000000000..7403d2557c
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/util/TestRetryDetector.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+public class TestRetryDetector {
+  private static final String METRICS_NAME = "name";
+
+  @Test
+  public void testNoMetrics() {
+    RetryDetector detector = new RetryDetector();
+    Assert.assertFalse("Should default to false", detector.retried());
+  }
+
+  @Test
+  public void testRetryCountMissing() {
+    MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+
+    RetryDetector detector = new RetryDetector();
+    detector.publish(metrics.collect());
+    Assert.assertFalse(
+        "Should not detect retries if RETRY_COUNT metric is not reported", detector.retried());
+  }
+
+  @Test
+  public void testRetryCountZero() {
+    MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+    metrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+
+    RetryDetector detector = new RetryDetector();
+    detector.publish(metrics.collect());
+    Assert.assertFalse("Should not detect retries if RETRY_COUNT is zero", detector.retried());
+  }
+
+  @Test
+  public void testRetryCountNonZero() {
+    MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+    metrics.reportMetric(CoreMetric.RETRY_COUNT, 1);
+
+    RetryDetector detector = new RetryDetector();
+    detector.publish(metrics.collect());
+    Assert.assertTrue("Should detect retries if RETRY_COUNT is non-zero", detector.retried());
+  }
+
+  @Test
+  public void testMultipleRetryCounts() {
+    MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+    metrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+    metrics.reportMetric(CoreMetric.RETRY_COUNT, 1);
+
+    RetryDetector detector = new RetryDetector();
+    detector.publish(metrics.collect());
+    Assert.assertTrue(
+        "Should detect retries if even one RETRY_COUNT is non-zero", detector.retried());
+  }
+
+  @Test
+  public void testNestedRetryCountZero() {
+    MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+    metrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+    MetricCollector childMetrics = metrics.createChild("child1").createChild("child2");
+    childMetrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+
+    RetryDetector detector = new RetryDetector();
+    detector.publish(metrics.collect());
+    Assert.assertFalse(
+        "Should not detect retries if nested RETRY_COUNT is zero", detector.retried());
+  }
+
+  @Test
+  public void testNestedRetryCountNonZero() {
+    MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+    metrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+    MetricCollector childMetrics = metrics.createChild("child1").createChild("child2");
+    childMetrics.reportMetric(CoreMetric.RETRY_COUNT, 1);
+
+    RetryDetector detector = new RetryDetector();
+    detector.publish(metrics.collect());
+    Assert.assertTrue(
+        "Should detect retries if nested RETRY_COUNT is non-zero", detector.retried());
+  }
+
+  @Test
+  public void testNestedRetryCountMultipleChildren() {
+    MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+    metrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+    for (int i = 0; i < 5; i++) {
+      MetricCollector childMetrics = metrics.createChild("child" + i);
+      childMetrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+    }
+
+    MetricCollector childMetrics = metrics.createChild("child10");
+    childMetrics.reportMetric(CoreMetric.RETRY_COUNT, 1);
+
+    RetryDetector detector = new RetryDetector();
+    detector.publish(metrics.collect());
+    Assert.assertTrue(
+        "Should detect retries if even one nested RETRY_COUNT is non-zero", detector.retried());
+  }
+
+  @Test
+  public void testMultipleCollectionsReported() {
+    MetricCollector metrics1 = MetricCollector.create(METRICS_NAME);
+    metrics1.reportMetric(CoreMetric.RETRY_COUNT, 0);
+    MetricCollector metrics2 = MetricCollector.create(METRICS_NAME);
+    metrics2.reportMetric(CoreMetric.RETRY_COUNT, 1);
+
+    RetryDetector detector = new RetryDetector();
+    detector.publish(metrics1.collect());
+    Assert.assertFalse("Should not detect retries if RETRY_COUNT is zero", detector.retried());
+    detector.publish(metrics2.collect());
+    Assert.assertTrue(
+        "Should continue detecting retries in additional metrics", detector.retried());
+  }
+
+  @Test
+  public void testNoOpAfterDetection() {
+    MetricCollector metrics1 = MetricCollector.create(METRICS_NAME);
+    metrics1.reportMetric(CoreMetric.RETRY_COUNT, 1);
+    MetricCollection metrics1Spy = Mockito.spy(metrics1.collect());
+    MetricCollector metrics2 = MetricCollector.create(METRICS_NAME);
+    metrics2.reportMetric(CoreMetric.RETRY_COUNT, 0);
+    MetricCollection metrics2Spy = Mockito.spy(metrics2.collect());
+
+    RetryDetector detector = new RetryDetector();
+    detector.publish(metrics1Spy);
+    Assert.assertTrue("Should detect retries if RETRY_COUNT is zero", detector.retried());
+    detector.publish(metrics2Spy);
+    Assert.assertTrue("Should remain true once a retry is detected", detector.retried());
+
+    Mockito.verify(metrics1Spy).metricValues(Mockito.eq(CoreMetric.RETRY_COUNT));
+    Mockito.verifyNoMoreInteractions(metrics1Spy, metrics2Spy);
+  }
+}