You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/10 21:04:18 UTC
[iceberg] branch master updated: AWS: Check commit status after failed commit if AWS client performed retries (#7198)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9f3707bc7f AWS: Check commit status after failed commit if AWS client performed retries (#7198)
9f3707bc7f is described below
commit 9f3707bc7fd133735b83c245a6545a83521e4e02
Author: Christina Larsen <ch...@gmail.com>
AuthorDate: Mon Apr 10 14:04:11 2023 -0700
AWS: Check commit status after failed commit if AWS client performed retries (#7198)
---
.../org/apache/iceberg/aws/glue/GlueTestBase.java | 11 +-
.../aws/glue/TestGlueCatalogCommitFailure.java | 87 +++++++++++-
.../aws/dynamodb/DynamoDbTableOperations.java | 35 +++--
.../iceberg/aws/glue/GlueTableOperations.java | 93 +++++++-----
.../org/apache/iceberg/aws/util/RetryDetector.java | 51 +++++++
.../apache/iceberg/aws/util/TestRetryDetector.java | 156 +++++++++++++++++++++
6 files changed, 375 insertions(+), 58 deletions(-)
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
index c0623da796..122deda1a7 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
@@ -34,7 +34,6 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.LockManagers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
@@ -83,13 +82,7 @@ public class GlueTestBase {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoDeleteBatchSize(10);
glueCatalog.initialize(
- catalogName,
- testBucketPath,
- properties,
- glue,
- LockManagers.defaultLockManager(),
- fileIO,
- ImmutableMap.of());
+ catalogName, testBucketPath, properties, glue, null, fileIO, ImmutableMap.of());
glueCatalogWithSkipNameValidation = new GlueCatalog();
AwsProperties propertiesSkipNameValidation = new AwsProperties();
@@ -99,7 +92,7 @@ public class GlueTestBase {
testBucketPath,
propertiesSkipNameValidation,
glue,
- LockManagers.defaultLockManager(),
+ null,
fileIO,
ImmutableMap.of());
}
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java
index 66c87be242..ea8316fb56 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java
@@ -26,15 +26,19 @@ import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.aws.s3.S3TestUtil;
+import org.apache.iceberg.aws.util.RetryDetector;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.services.glue.model.AccessDeniedException;
import software.amazon.awssdk.services.glue.model.ConcurrentModificationException;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
@@ -123,6 +127,77 @@ public class TestGlueCatalogCommitFailure extends GlueTestBase {
Assert.assertEquals("No new metadata files should exist", 2, metadataFileCount(ops.current()));
}
+ @Test
+ public void testCheckCommitStatusAfterRetries() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ TableIdentifier tableId = TableIdentifier.of(namespace, tableName);
+
+ GlueTableOperations spyOps =
+ Mockito.spy((GlueTableOperations) glueCatalog.newTableOps(tableId));
+ GlueCatalog spyCatalog = Mockito.spy(glueCatalog);
+ Mockito.doReturn(spyOps).when(spyCatalog).newTableOps(Mockito.eq(tableId));
+ Table table = spyCatalog.loadTable(tableId);
+
+ TableMetadata metadataV1 = spyOps.current();
+ simulateRetriedCommit(spyOps, true /* report retry */);
+ updateTable(table, spyOps);
+
+ Assert.assertNotEquals("Current metadata should have changed", metadataV1, spyOps.current());
+ Assert.assertTrue("Current metadata should still exist", metadataFileExists(spyOps.current()));
+ Assert.assertEquals(
+ "No new metadata files should exist", 2, metadataFileCount(spyOps.current()));
+ }
+
+ @Test
+ public void testNoRetryAwarenessCorruptsTable() {
+ // This test exists to replicate the issue the prior test validates the fix for
+ // See https://github.com/apache/iceberg/issues/7151
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ TableIdentifier tableId = TableIdentifier.of(namespace, tableName);
+
+ GlueTableOperations spyOps =
+ Mockito.spy((GlueTableOperations) glueCatalog.newTableOps(tableId));
+ GlueCatalog spyCatalog = Mockito.spy(glueCatalog);
+ Mockito.doReturn(spyOps).when(spyCatalog).newTableOps(Mockito.eq(tableId));
+ Table table = spyCatalog.loadTable(tableId);
+
+ // Its possible that Glue or DynamoDB might someday make changes that render the retry detection
+ // mechanism unnecessary. At that time, this test would start failing while the prior one would
+ // still work. If or when that happens, we can re-evaluate whether the mechanism is still
+ // necessary.
+ simulateRetriedCommit(spyOps, false /* hide retry */);
+ Assertions.assertThatThrownBy(() -> updateTable(table, spyOps))
+ .as("Hidden retry causes writer to conflict with itself")
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Glue detected concurrent update")
+ .cause()
+ .isInstanceOf(ConcurrentModificationException.class);
+
+ Assertions.assertThatThrownBy(() -> glueCatalog.loadTable(tableId))
+ .as("Table still accessible despite hidden retry, underlying assumptions may have changed")
+ .isInstanceOf(NotFoundException.class)
+ .hasMessageContaining("Location does not exist");
+ }
+
+ private void simulateRetriedCommit(GlueTableOperations spyOps, boolean reportRetry) {
+ // Perform a successful commit, then call it again, optionally letting the retryDetector know
+ // about it
+ Mockito.doAnswer(
+ i -> {
+ final MetricCollector metrics = MetricCollector.create("test");
+ metrics.reportMetric(CoreMetric.RETRY_COUNT, reportRetry ? 1 : 0);
+
+ i.callRealMethod();
+ i.getArgument(3, RetryDetector.class).publish(metrics.collect());
+ i.callRealMethod();
+ return null;
+ })
+ .when(spyOps)
+ .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any(), Mockito.any());
+ }
+
@Test
public void testCommitThrowsExceptionWhileSucceeded() {
Table table = setupTable();
@@ -253,7 +328,8 @@ public class TestGlueCatalogCommitFailure extends GlueTestBase {
realOps.persistGlueTable(
i.getArgument(0, software.amazon.awssdk.services.glue.model.Table.class),
mapProperties,
- i.getArgument(2, TableMetadata.class));
+ i.getArgument(2, TableMetadata.class),
+ i.getArgument(3, RetryDetector.class));
// new metadata location is stored in map property, and used for locking
String newMetadataLocation =
@@ -267,7 +343,7 @@ public class TestGlueCatalogCommitFailure extends GlueTestBase {
throw new RuntimeException("Datacenter on fire");
})
.when(spyOperations)
- .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any());
+ .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any(), Mockito.any());
}
@Test
@@ -422,11 +498,12 @@ public class TestGlueCatalogCommitFailure extends GlueTestBase {
realOps.persistGlueTable(
i.getArgument(0, software.amazon.awssdk.services.glue.model.Table.class),
i.getArgument(1, Map.class),
- i.getArgument(2, TableMetadata.class));
+ i.getArgument(2, TableMetadata.class),
+ i.getArgument(3, RetryDetector.class));
throw new RuntimeException("Datacenter on fire");
})
.when(spyOps)
- .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any());
+ .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any(), Mockito.any());
}
private void failCommitAndThrowException(GlueTableOperations spyOps) {
@@ -436,7 +513,7 @@ public class TestGlueCatalogCommitFailure extends GlueTestBase {
private void failCommitAndThrowException(GlueTableOperations spyOps, Exception exceptionToThrow) {
Mockito.doThrow(exceptionToThrow)
.when(spyOps)
- .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any());
+ .persistGlueTable(Mockito.any(), Mockito.anyMap(), Mockito.any(), Mockito.any());
}
private void breakFallbackCatalogCommitCheck(GlueTableOperations spyOperations) {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java
index e38aa0a867..a1a330b118 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.util.RetryDetector;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
@@ -105,6 +106,7 @@ class DynamoDbTableOperations extends BaseMetastoreTableOperations {
boolean newTable = base == null;
String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata);
CommitStatus commitStatus = CommitStatus.FAILURE;
+ RetryDetector retryDetector = new RetryDetector();
Map<String, AttributeValue> tableKey = DynamoDbCatalog.tablePrimaryKey(tableIdentifier);
try {
GetItemResponse table =
@@ -116,20 +118,28 @@ class DynamoDbTableOperations extends BaseMetastoreTableOperations {
.build());
checkMetadataLocation(table, base);
Map<String, String> properties = prepareProperties(table, newMetadataLocation);
- persistTable(tableKey, table, properties);
+ persistTable(tableKey, table, properties, retryDetector);
commitStatus = CommitStatus.SUCCESS;
- } catch (ConditionalCheckFailedException e) {
- throw new CommitFailedException(
- e, "Cannot commit %s: concurrent update detected", tableName());
} catch (CommitFailedException e) {
// any explicit commit failures are passed up and out to the retry handler
throw e;
} catch (RuntimeException persistFailure) {
- LOG.error(
- "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
- fullTableName,
- persistFailure);
- commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+ boolean conditionCheckFailed = persistFailure instanceof ConditionalCheckFailedException;
+
+ // If we got an exception we weren't expecting, or we got a ConditionalCheckFailedException
+ // but retries were performed, attempt to reconcile the actual commit status.
+ if (!conditionCheckFailed || retryDetector.retried()) {
+ LOG.warn(
+ "Received unexpected failure when committing to {}, validating if commit ended up succeeding.",
+ fullTableName,
+ persistFailure);
+ commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+ }
+
+ if (commitStatus != CommitStatus.SUCCESS && conditionCheckFailed) {
+ throw new CommitFailedException(
+ persistFailure, "Cannot commit %s: concurrent update detected", tableName());
+ }
switch (commitStatus) {
case SUCCESS:
@@ -188,7 +198,10 @@ class DynamoDbTableOperations extends BaseMetastoreTableOperations {
}
void persistTable(
- Map<String, AttributeValue> tableKey, GetItemResponse table, Map<String, String> parameters) {
+ Map<String, AttributeValue> tableKey,
+ GetItemResponse table,
+ Map<String, String> parameters,
+ RetryDetector retryDetector) {
if (table.hasItem()) {
LOG.debug("Committing existing DynamoDb catalog table: {}", tableName());
List<String> updateParts = Lists.newArrayList();
@@ -209,6 +222,7 @@ class DynamoDbTableOperations extends BaseMetastoreTableOperations {
attributeValues.put(":v", table.item().get(DynamoDbCatalog.COL_VERSION));
dynamo.updateItem(
UpdateItemRequest.builder()
+ .overrideConfiguration(c -> c.addMetricPublisher(retryDetector))
.tableName(awsProperties.dynamoDbTableName())
.key(tableKey)
.conditionExpression(DynamoDbCatalog.COL_VERSION + " = :v")
@@ -226,6 +240,7 @@ class DynamoDbTableOperations extends BaseMetastoreTableOperations {
dynamo.putItem(
PutItemRequest.builder()
+ .overrideConfiguration(c -> c.addMetricPublisher(retryDetector))
.tableName(awsProperties.dynamoDbTableName())
.item(values)
.conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
index 84887cf253..6673d50c97 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.LockManager;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.aws.util.RetryDetector;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
@@ -142,6 +143,7 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
CommitStatus commitStatus = CommitStatus.FAILURE;
+ RetryDetector retryDetector = new RetryDetector();
String newMetadataLocation = null;
boolean glueTempTableCreated = false;
@@ -154,47 +156,29 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
Table glueTable = getGlueTable();
checkMetadataLocation(glueTable, base);
Map<String, String> properties = prepareProperties(glueTable, newMetadataLocation);
- persistGlueTable(glueTable, properties, metadata);
+ persistGlueTable(glueTable, properties, metadata, retryDetector);
commitStatus = CommitStatus.SUCCESS;
} catch (CommitFailedException e) {
throw e;
- } catch (ConcurrentModificationException e) {
- throw new CommitFailedException(
- e, "Cannot commit %s because Glue detected concurrent update", tableName());
- } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) {
- throw new AlreadyExistsException(
- e,
- "Cannot commit %s because its Glue table already exists when trying to create one",
- tableName());
- } catch (EntityNotFoundException e) {
- throw new NotFoundException(
- e, "Cannot commit %s because Glue cannot find the requested entity", tableName());
- } catch (AccessDeniedException e) {
- throw new ForbiddenException(
- e, "Cannot commit %s because Glue cannot access the requested resources", tableName());
- } catch (software.amazon.awssdk.services.glue.model.ValidationException e) {
- throw new ValidationException(
- e,
- "Cannot commit %s because Glue encountered a validation exception "
- + "while accessing requested resources",
- tableName());
} catch (RuntimeException persistFailure) {
- LOG.error(
- "Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
- fullTableName,
- persistFailure);
+ boolean isAwsServiceException = persistFailure instanceof AwsServiceException;
- if (persistFailure instanceof AwsServiceException) {
- int statusCode = ((AwsServiceException) persistFailure).statusCode();
- if (statusCode >= 500 && statusCode < 600) {
- commitStatus = CommitStatus.FAILURE;
- } else {
- throw persistFailure;
- }
- } else {
+ // If we got an exception we weren't expecting, or we got an AWS service exception
+ // but retries were performed, attempt to reconcile the actual commit status.
+ if (!isAwsServiceException || retryDetector.retried()) {
+ LOG.warn(
+ "Received unexpected failure when committing to {}, validating if commit ended up succeeding.",
+ fullTableName,
+ persistFailure);
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
}
+ // If we got an AWS exception we would usually handle, but find we
+ // succeeded on a retry that threw an exception, skip the exception.
+ if (commitStatus != CommitStatus.SUCCESS && isAwsServiceException) {
+ handleAWSExceptions((AwsServiceException) persistFailure);
+ }
+
switch (commitStatus) {
case SUCCESS:
break;
@@ -298,11 +282,16 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
}
@VisibleForTesting
- void persistGlueTable(Table glueTable, Map<String, String> parameters, TableMetadata metadata) {
+ void persistGlueTable(
+ Table glueTable,
+ Map<String, String> parameters,
+ TableMetadata metadata,
+ RetryDetector retryDetector) {
if (glueTable != null) {
LOG.debug("Committing existing Glue table: {}", tableName());
UpdateTableRequest.Builder updateTableRequest =
UpdateTableRequest.builder()
+ .overrideConfiguration(c -> c.addMetricPublisher(retryDetector))
.catalogId(awsProperties.glueCatalogId())
.databaseName(databaseName)
.skipArchive(awsProperties.glueCatalogSkipArchive())
@@ -325,6 +314,7 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
LOG.debug("Committing new Glue table: {}", tableName());
glue.createTable(
CreateTableRequest.builder()
+ .overrideConfiguration(c -> c.addMetricPublisher(retryDetector))
.catalogId(awsProperties.glueCatalogId())
.databaseName(databaseName)
.tableInput(
@@ -340,6 +330,41 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
}
}
+ private void handleAWSExceptions(AwsServiceException persistFailure) {
+ if (persistFailure instanceof ConcurrentModificationException) {
+ throw new CommitFailedException(
+ persistFailure, "Cannot commit %s because Glue detected concurrent update", tableName());
+ } else if (persistFailure
+ instanceof software.amazon.awssdk.services.glue.model.AlreadyExistsException) {
+ throw new AlreadyExistsException(
+ persistFailure,
+ "Cannot commit %s because its Glue table already exists when trying to create one",
+ tableName());
+ } else if (persistFailure instanceof EntityNotFoundException) {
+ throw new NotFoundException(
+ persistFailure,
+ "Cannot commit %s because Glue cannot find the requested entity",
+ tableName());
+ } else if (persistFailure instanceof AccessDeniedException) {
+ throw new ForbiddenException(
+ persistFailure,
+ "Cannot commit %s because Glue cannot access the requested resources",
+ tableName());
+ } else if (persistFailure
+ instanceof software.amazon.awssdk.services.glue.model.ValidationException) {
+ throw new ValidationException(
+ persistFailure,
+ "Cannot commit %s because Glue encountered a validation exception "
+ + "while accessing requested resources",
+ tableName());
+ } else {
+ int statusCode = persistFailure.statusCode();
+ if (statusCode < 500 || statusCode >= 600) {
+ throw persistFailure;
+ }
+ }
+ }
+
@VisibleForTesting
void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation) {
try {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/util/RetryDetector.java b/aws/src/main/java/org/apache/iceberg/aws/util/RetryDetector.java
new file mode 100644
index 0000000000..a5df36a2c8
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/util/RetryDetector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+
+/**
+ * Metrics are the only reliable way provided by the AWS SDK to determine if an API call was
+ * retried. This class can be attached to an AWS API call and checked after to determine if retries
+ * occurred.
+ */
+public class RetryDetector implements MetricPublisher {
+ private final AtomicBoolean retried = new AtomicBoolean(false);
+
+ @Override
+ public void publish(MetricCollection metricCollection) {
+ if (!retried.get()) {
+ if (metricCollection.metricValues(CoreMetric.RETRY_COUNT).stream().anyMatch(i -> i > 0)) {
+ retried.set(true);
+ } else {
+ metricCollection.children().forEach(this::publish);
+ }
+ }
+ }
+
+ @Override
+ public void close() {}
+
+ public boolean retried() {
+ return retried.get();
+ }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/util/TestRetryDetector.java b/aws/src/test/java/org/apache/iceberg/aws/util/TestRetryDetector.java
new file mode 100644
index 0000000000..7403d2557c
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/util/TestRetryDetector.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+public class TestRetryDetector {
+ private static final String METRICS_NAME = "name";
+
+ @Test
+ public void testNoMetrics() {
+ RetryDetector detector = new RetryDetector();
+ Assert.assertFalse("Should default to false", detector.retried());
+ }
+
+ @Test
+ public void testRetryCountMissing() {
+ MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+
+ RetryDetector detector = new RetryDetector();
+ detector.publish(metrics.collect());
+ Assert.assertFalse(
+ "Should not detect retries if RETRY_COUNT metric is not reported", detector.retried());
+ }
+
+ @Test
+ public void testRetryCountZero() {
+ MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+ metrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+
+ RetryDetector detector = new RetryDetector();
+ detector.publish(metrics.collect());
+ Assert.assertFalse("Should not detect retries if RETRY_COUNT is zero", detector.retried());
+ }
+
+ @Test
+ public void testRetryCountNonZero() {
+ MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+ metrics.reportMetric(CoreMetric.RETRY_COUNT, 1);
+
+ RetryDetector detector = new RetryDetector();
+ detector.publish(metrics.collect());
+ Assert.assertTrue("Should detect retries if RETRY_COUNT is non-zero", detector.retried());
+ }
+
+ @Test
+ public void testMultipleRetryCounts() {
+ MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+ metrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+ metrics.reportMetric(CoreMetric.RETRY_COUNT, 1);
+
+ RetryDetector detector = new RetryDetector();
+ detector.publish(metrics.collect());
+ Assert.assertTrue(
+ "Should detect retries if even one RETRY_COUNT is non-zero", detector.retried());
+ }
+
+ @Test
+ public void testNestedRetryCountZero() {
+ MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+ metrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+ MetricCollector childMetrics = metrics.createChild("child1").createChild("child2");
+ childMetrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+
+ RetryDetector detector = new RetryDetector();
+ detector.publish(metrics.collect());
+ Assert.assertFalse(
+ "Should not detect retries if nested RETRY_COUNT is zero", detector.retried());
+ }
+
+ @Test
+ public void testNestedRetryCountNonZero() {
+ MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+ metrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+ MetricCollector childMetrics = metrics.createChild("child1").createChild("child2");
+ childMetrics.reportMetric(CoreMetric.RETRY_COUNT, 1);
+
+ RetryDetector detector = new RetryDetector();
+ detector.publish(metrics.collect());
+ Assert.assertTrue(
+ "Should detect retries if nested RETRY_COUNT is non-zero", detector.retried());
+ }
+
+ @Test
+ public void testNestedRetryCountMultipleChildren() {
+ MetricCollector metrics = MetricCollector.create(METRICS_NAME);
+ metrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+ for (int i = 0; i < 5; i++) {
+ MetricCollector childMetrics = metrics.createChild("child" + i);
+ childMetrics.reportMetric(CoreMetric.RETRY_COUNT, 0);
+ }
+
+ MetricCollector childMetrics = metrics.createChild("child10");
+ childMetrics.reportMetric(CoreMetric.RETRY_COUNT, 1);
+
+ RetryDetector detector = new RetryDetector();
+ detector.publish(metrics.collect());
+ Assert.assertTrue(
+ "Should detect retries if even one nested RETRY_COUNT is non-zero", detector.retried());
+ }
+
+ @Test
+ public void testMultipleCollectionsReported() {
+ MetricCollector metrics1 = MetricCollector.create(METRICS_NAME);
+ metrics1.reportMetric(CoreMetric.RETRY_COUNT, 0);
+ MetricCollector metrics2 = MetricCollector.create(METRICS_NAME);
+ metrics2.reportMetric(CoreMetric.RETRY_COUNT, 1);
+
+ RetryDetector detector = new RetryDetector();
+ detector.publish(metrics1.collect());
+ Assert.assertFalse("Should not detect retries if RETRY_COUNT is zero", detector.retried());
+ detector.publish(metrics2.collect());
+ Assert.assertTrue(
+ "Should continue detecting retries in additional metrics", detector.retried());
+ }
+
+ @Test
+ public void testNoOpAfterDetection() {
+ MetricCollector metrics1 = MetricCollector.create(METRICS_NAME);
+ metrics1.reportMetric(CoreMetric.RETRY_COUNT, 1);
+ MetricCollection metrics1Spy = Mockito.spy(metrics1.collect());
+ MetricCollector metrics2 = MetricCollector.create(METRICS_NAME);
+ metrics2.reportMetric(CoreMetric.RETRY_COUNT, 0);
+ MetricCollection metrics2Spy = Mockito.spy(metrics2.collect());
+
+ RetryDetector detector = new RetryDetector();
+ detector.publish(metrics1Spy);
+ Assert.assertTrue("Should detect retries if RETRY_COUNT is zero", detector.retried());
+ detector.publish(metrics2Spy);
+ Assert.assertTrue("Should remain true once a retry is detected", detector.retried());
+
+ Mockito.verify(metrics1Spy).metricValues(Mockito.eq(CoreMetric.RETRY_COUNT));
+ Mockito.verifyNoMoreInteractions(metrics1Spy, metrics2Spy);
+ }
+}