You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by yy...@apache.org on 2021/04/06 01:05:40 UTC
[iceberg] branch master updated: AWS: handle uncertain catalog
state for glue (#2402)
This is an automated email from the ASF dual-hosted git repository.
yyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 68c48ec AWS: handle uncertain catalog state for glue (#2402)
68c48ec is described below
commit 68c48ec9bd8c3c3ae14a365f937d1538a4fbf826
Author: yyanyy <ya...@amazon.com>
AuthorDate: Mon Apr 5 18:04:36 2021 -0700
AWS: handle uncertain catalog state for glue (#2402)
---
.../aws/glue/GlueCatalogCommitFailureTest.java | 287 +++++++++++++++++++++
.../iceberg/aws/glue/GlueTableOperations.java | 34 ++-
.../iceberg/BaseMetastoreTableOperations.java | 55 ++++
.../apache/iceberg/hive/HiveTableOperations.java | 57 ----
4 files changed, 367 insertions(+), 66 deletions(-)
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogCommitFailureTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogCommitFailureTest.java
new file mode 100644
index 0000000..efada18
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogCommitFailureTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.services.s3.AmazonS3URI;
+import java.io.File;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import software.amazon.awssdk.services.glue.model.ConcurrentModificationException;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+public class GlueCatalogCommitFailureTest extends GlueTestBase {
+
+ @Test
+ public void testFailedCommit() {
+ Table table = setupTable();
+ GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations();
+
+ TableMetadata metadataV1 = ops.current();
+ TableMetadata metadataV2 = updateTable(table, ops);
+
+ GlueTableOperations spyOps = Mockito.spy(ops);
+ failCommitAndThrowException(spyOps);
+
+ AssertHelpers.assertThrows("We should wrap the error to CommitFailedException if the " +
+ "commit actually doesn't succeed", CommitFailedException.class, "unexpected exception",
+ () -> spyOps.commit(metadataV2, metadataV1));
+ Mockito.verify(spyOps, Mockito.times(1)).refresh();
+
+ ops.refresh();
+ Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current());
+ Assert.assertTrue("Current metadata should still exist", metadataFileExists(metadataV2));
+ Assert.assertEquals("No new metadata files should exist", 2, metadataFileCount(ops.current()));
+ }
+
+ @Test
+ public void testConcurrentModificationExceptionDoesNotCheckCommitStatus() {
+ Table table = setupTable();
+ GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations();
+
+ TableMetadata metadataV1 = ops.current();
+ TableMetadata metadataV2 = updateTable(table, ops);
+
+ GlueTableOperations spyOps = Mockito.spy(ops);
+ failCommitAndThrowException(spyOps, ConcurrentModificationException.builder().build());
+
+ try {
+ spyOps.commit(metadataV2, metadataV1);
+ } catch (CommitFailedException e) {
+ Assert.assertTrue("Exception message should mention concurrent exception",
+ e.getMessage().contains("Glue detected concurrent update"));
+ Assert.assertTrue("Cause should be concurrent modification exception",
+ e.getCause() instanceof ConcurrentModificationException);
+ }
+ Mockito.verify(spyOps, Mockito.times(0)).refresh();
+
+ ops.refresh();
+ Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current());
+ Assert.assertTrue("Current metadata should still exist", metadataFileExists(metadataV2));
+ Assert.assertEquals("No new metadata files should exist", 2, metadataFileCount(ops.current()));
+ }
+
+ @Test
+ public void testCommitThrowsExceptionWhileSucceeded() {
+ Table table = setupTable();
+ GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations();
+
+ TableMetadata metadataV1 = ops.current();
+ TableMetadata metadataV2 = updateTable(table, ops);
+
+ GlueTableOperations spyOps = Mockito.spy(ops);
+
+ // Simulate a communication error after a successful commit
+ commitAndThrowException(ops, spyOps);
+
+ // Shouldn't throw because the commit actually succeeds even though persistTable throws an exception
+ spyOps.commit(metadataV2, metadataV1);
+
+ ops.refresh();
+ Assert.assertNotEquals("Current metadata should have changed", metadataV2, ops.current());
+ Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current()));
+ Assert.assertEquals("Commit should have been successful and new metadata file should be made",
+ 3, metadataFileCount(ops.current()));
+ }
+
+ @Test
+ public void testFailedCommitThrowsUnknownException() {
+ Table table = setupTable();
+ GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations();
+
+ TableMetadata metadataV1 = ops.current();
+ TableMetadata metadataV2 = updateTable(table, ops);
+
+ GlueTableOperations spyOps = Mockito.spy(ops);
+ failCommitAndThrowException(spyOps);
+ breakFallbackCatalogCommitCheck(spyOps);
+
+ AssertHelpers.assertThrows("Should throw CommitStateUnknownException since the catalog check was blocked",
+ CommitStateUnknownException.class, "Datacenter on fire",
+ () -> spyOps.commit(metadataV2, metadataV1));
+
+ ops.refresh();
+
+ Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current());
+ Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current()));
+ Assert.assertEquals("Client could not determine outcome so new metadata file should also exist",
+ 3, metadataFileCount(ops.current()));
+ }
+
+ @Test
+ public void testSucceededCommitThrowsUnknownException() {
+ Table table = setupTable();
+ GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations();
+
+ TableMetadata metadataV1 = ops.current();
+ TableMetadata metadataV2 = updateTable(table, ops);
+
+ GlueTableOperations spyOps = Mockito.spy(ops);
+ commitAndThrowException(ops, spyOps);
+ breakFallbackCatalogCommitCheck(spyOps);
+
+ AssertHelpers.assertThrows("Should throw CommitStateUnknownException since the catalog check was blocked",
+ CommitStateUnknownException.class, "Datacenter on fire",
+ () -> spyOps.commit(metadataV2, metadataV1));
+
+ ops.refresh();
+
+ Assert.assertNotEquals("Current metadata should have changed", ops.current(), metadataV2);
+ Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current()));
+ }
+
+ /**
+ * Pretends we threw an exception while persisting, the commit succeeded, the lock expired,
+ * and a second committer placed a commit on top of ours before the first committer was able to check
+ * if their commit succeeded or not
+ *
+ * Timeline:
+ * Client 1 commits which throws an exception but suceeded
+ * Client 1's lock expires while waiting to do the recheck for commit success
+ * Client 2 acquires a lock, commits successfully on top of client 1's commit and release lock
+ * Client 1 check's to see if their commit was successful
+ *
+ * This tests to make sure a disconnected client 1 doesn't think their commit failed just because it isn't the
+ * current one during the recheck phase.
+ */
+ @Test
+ public void testExceptionThrownInConcurrentCommit() {
+ Table table = setupTable();
+ GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations();
+
+ TableMetadata metadataV1 = ops.current();
+ TableMetadata metadataV2 = updateTable(table, ops);
+
+ GlueTableOperations spyOps = Mockito.spy(ops);
+ concurrentCommitAndThrowException(ops, spyOps, table);
+
+ /*
+ This commit and our concurrent commit should succeed even though this commit throws an exception
+ after the persist operation succeeds
+ */
+ spyOps.commit(metadataV2, metadataV1);
+
+ ops.refresh();
+ Assert.assertNotEquals("Current metadata should have changed", metadataV2, ops.current());
+ Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current()));
+ Assert.assertEquals("The column addition from the concurrent commit should have been successful",
+ 2, ops.current().schema().columns().size());
+ }
+
+ private void concurrentCommitAndThrowException(GlueTableOperations realOps, GlueTableOperations spyOperations,
+ Table table) {
+ // Simulate a communication error after a successful commit
+ Mockito.doAnswer(i -> {
+ Map<String, String> mapProperties = i.getArgumentAt(1, Map.class);
+ realOps.persistGlueTable(
+ i.getArgumentAt(0, software.amazon.awssdk.services.glue.model.Table.class),
+ mapProperties);
+
+ // new metadata location is stored in map property, and used for locking
+ String newMetadataLocation = mapProperties.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+
+ // Simulate lock expiration or removal, use commit status null to avoid deleting data
+ realOps.cleanupMetadataAndUnlock(null, newMetadataLocation);
+
+ table.refresh();
+ table.updateSchema().addColumn("newCol", Types.IntegerType.get()).commit();
+ throw new SdkBaseException("Datacenter on fire");
+ }).when(spyOperations).persistGlueTable(Matchers.any(), Matchers.anyMap());
+ }
+
+ private Table setupTable() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ return glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+ }
+
+ private TableMetadata updateTable(Table table, GlueTableOperations ops) {
+ table.updateSchema()
+ .addColumn("n", Types.IntegerType.get())
+ .commit();
+
+ ops.refresh();
+
+ TableMetadata metadataV2 = ops.current();
+
+ Assert.assertEquals(2, metadataV2.schema().columns().size());
+ return metadataV2;
+ }
+
+ private void commitAndThrowException(GlueTableOperations realOps, GlueTableOperations spyOps) {
+ Mockito.doAnswer(i -> {
+ realOps.persistGlueTable(
+ i.getArgumentAt(0, software.amazon.awssdk.services.glue.model.Table.class),
+ i.getArgumentAt(1, Map.class));
+ throw new SdkBaseException("Datacenter on fire");
+ }).when(spyOps).persistGlueTable(Matchers.any(), Matchers.anyMap());
+ }
+
+ private void failCommitAndThrowException(GlueTableOperations spyOps) {
+ failCommitAndThrowException(spyOps, new SdkBaseException("Datacenter on fire"));
+ }
+
+ private void failCommitAndThrowException(GlueTableOperations spyOps, Exception exceptionToThrow) {
+ Mockito.doThrow(exceptionToThrow)
+ .when(spyOps).persistGlueTable(Matchers.any(), Matchers.anyMap());
+ }
+
+ private void breakFallbackCatalogCommitCheck(GlueTableOperations spyOperations) {
+ Mockito.when(spyOperations.refresh())
+ .thenThrow(new RuntimeException("Still on fire")); // Failure on commit check
+ }
+
+ private boolean metadataFileExists(TableMetadata metadata) {
+ AmazonS3URI amazonS3URI = new AmazonS3URI(metadata.metadataFileLocation());
+
+ try {
+ s3.headObject(HeadObjectRequest.builder()
+ .bucket(amazonS3URI.getBucket())
+ .key(amazonS3URI.getKey())
+ .build());
+ return true;
+ } catch (NoSuchKeyException e) {
+ return false;
+ }
+ }
+
+ private int metadataFileCount(TableMetadata metadata) {
+ AmazonS3URI amazonS3URI = new AmazonS3URI(metadata.metadataFileLocation());
+ return (int) s3.listObjectsV2(ListObjectsV2Request.builder()
+ .bucket(amazonS3URI.getBucket())
+ .prefix(new File(amazonS3URI.getKey()).getParent())
+ .build())
+ .contents().stream().filter(s3Object -> s3Object.key().endsWith("metadata.json")).count();
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
index 832c45d..e982246 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
@@ -28,12 +28,13 @@ import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.ConcurrentModificationException;
import software.amazon.awssdk.services.glue.model.CreateTableRequest;
@@ -103,23 +104,36 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
- boolean exceptionThrown = true;
+ CommitStatus commitStatus = CommitStatus.FAILURE;
+
try {
lock(newMetadataLocation);
Table glueTable = getGlueTable();
checkMetadataLocation(glueTable, base);
Map<String, String> properties = prepareProperties(glueTable, newMetadataLocation);
persistGlueTable(glueTable, properties);
- exceptionThrown = false;
+ commitStatus = CommitStatus.SUCCESS;
} catch (ConcurrentModificationException e) {
throw new CommitFailedException(e, "Cannot commit %s because Glue detected concurrent update", tableName());
} catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) {
throw new AlreadyExistsException(e,
"Cannot commit %s because its Glue table already exists when trying to create one", tableName());
- } catch (SdkException e) {
- throw new CommitFailedException(e, "Cannot commit %s because unexpected exception contacting AWS", tableName());
+ } catch (RuntimeException persistFailure) {
+ LOG.error("Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
+ fullTableName, persistFailure);
+ commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+
+ switch (commitStatus) {
+ case SUCCESS:
+ break;
+ case FAILURE:
+ throw new CommitFailedException(persistFailure,
+ "Cannot commit %s due to unexpected exception", tableName());
+ case UNKNOWN:
+ throw new CommitStateUnknownException(persistFailure);
+ }
} finally {
- cleanupMetadataAndUnlock(exceptionThrown, newMetadataLocation);
+ cleanupMetadataAndUnlock(commitStatus, newMetadataLocation);
}
}
@@ -164,7 +178,8 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
return properties;
}
- private void persistGlueTable(Table glueTable, Map<String, String> parameters) {
+ @VisibleForTesting
+ void persistGlueTable(Table glueTable, Map<String, String> parameters) {
if (glueTable != null) {
LOG.debug("Committing existing Glue table: {}", tableName());
glue.updateTable(UpdateTableRequest.builder()
@@ -191,9 +206,10 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
}
}
- private void cleanupMetadataAndUnlock(boolean exceptionThrown, String metadataLocation) {
+ @VisibleForTesting
+ void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation) {
try {
- if (exceptionThrown) {
+ if (commitStatus == CommitStatus.FAILURE) {
// if anything went wrong, clean up the uncommitted metadata file
io().deleteFile(metadataLocation);
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
index 157d437..d8e6e93 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
@@ -32,10 +32,14 @@ import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
+
public abstract class BaseMetastoreTableOperations implements TableOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class);
@@ -46,6 +50,8 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
private static final String METADATA_FOLDER_NAME = "metadata";
+ private static final int COMMIT_STATUS_CHECK_WAIT_MS = 1000;
+
private TableMetadata currentMetadata = null;
private String currentMetadataLocation = null;
private boolean shouldRefresh = true;
@@ -245,6 +251,55 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
};
}
+ protected enum CommitStatus {
+ FAILURE,
+ SUCCESS,
+ UNKNOWN
+ }
+
+ /**
+ * Attempt to load the table and see if any current or past metadata location matches the one we were attempting
+ * to set. This is used as a last resort when we are dealing with exceptions that may indicate the commit has
+ * failed but are not proof that this is the case. Past locations must also be searched on the chance that a second
+ * committer was able to successfully commit on top of our commit.
+ *
+ * @param newMetadataLocation the path of the new commit file
+ * @param config metadata to use for configuration
+ * @return Commit Status of Success, Failure or Unknown
+ */
+ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
+ int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), COMMIT_NUM_STATUS_CHECKS,
+ COMMIT_NUM_STATUS_CHECKS_DEFAULT);
+
+ AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);
+
+ Tasks.foreach(newMetadataLocation)
+ .retry(maxAttempts)
+ .suppressFailureWhenFinished()
+ .exponentialBackoff(COMMIT_STATUS_CHECK_WAIT_MS, COMMIT_STATUS_CHECK_WAIT_MS, Long.MAX_VALUE, 2.0)
+ .onFailure((location, checkException) ->
+ LOG.error("Cannot check if commit to {} exists.", tableName(), checkException))
+ .run(location -> {
+ TableMetadata metadata = refresh();
+ String currentMetadataFileLocation = metadata.metadataFileLocation();
+ boolean commitSuccess = currentMetadataFileLocation.equals(newMetadataLocation) ||
+ metadata.previousFiles().stream().anyMatch(log -> log.file().equals(newMetadataLocation));
+ if (commitSuccess) {
+ LOG.info("Commit status check: Commit to {} of {} succeeded", tableName(), newMetadataLocation);
+ status.set(CommitStatus.SUCCESS);
+ } else {
+ LOG.info("Commit status check: Commit to {} of {} failed", tableName(), newMetadataLocation);
+ status.set(CommitStatus.FAILURE);
+ }
+ });
+
+ if (status.get() == CommitStatus.UNKNOWN) {
+ LOG.error("Cannot determine commit state to {}. Failed during checking {} times. " +
+ "Treating commit state as unknown.", tableName(), maxAttempts);
+ }
+ return status.get();
+ }
+
private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
String codecName = meta.property(
TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 04f42ac..db9fe09 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -62,15 +62,11 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
-
/**
* TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to
* avoid code duplication between this class and Metacat Tables.
@@ -78,8 +74,6 @@ import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAUL
public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class);
- private static final int COMMIT_STATUS_CHECK_WAIT_MS = 1000;
-
private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
@@ -99,12 +93,6 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
}
}
- private enum CommitStatus {
- FAILURE,
- SUCCESS,
- UNKNOWN
- }
-
private final String fullName;
private final String database;
private final String tableName;
@@ -234,7 +222,6 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
throw new CommitStateUnknownException(persistFailure);
}
}
-
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
@@ -256,50 +243,6 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
}
}
- /**
- * Attempt to load the table and see if any current or past metadata location matches the one we were attempting
- * to set. This is used as a last resort when we are dealing with exceptions that may indicate the commit has
- * failed but are not proof that this is the case. Past locations must also be searched on the chance that a second
- * committer was able to successfully commit on top of our commit.
- *
- * @param newMetadataLocation the path of the new commit file
- * @param config metadata to use for configuration
- * @return Commit Status of Success, Failure or Unknown
- */
- private CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
- int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), COMMIT_NUM_STATUS_CHECKS,
- COMMIT_NUM_STATUS_CHECKS_DEFAULT);
-
- AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);
-
- Tasks.foreach(newMetadataLocation)
- .retry(maxAttempts)
- .suppressFailureWhenFinished()
- .exponentialBackoff(COMMIT_STATUS_CHECK_WAIT_MS, COMMIT_STATUS_CHECK_WAIT_MS, Long.MAX_VALUE, 2.0)
- .onFailure((location, checkException) ->
- LOG.error("Cannot check if commit to {}.{} exists.", database, tableName, checkException))
- .run(location -> {
- TableMetadata metadata = refresh();
- String currentMetadataLocation = metadata.metadataFileLocation();
- boolean commitSuccess = currentMetadataLocation.equals(newMetadataLocation) ||
- metadata.previousFiles().stream().anyMatch(log -> log.file().equals(newMetadataLocation));
- if (commitSuccess) {
- LOG.info("Commit status check: Commit to {}.{} of {} succeeded", database, tableName, newMetadataLocation);
- status.set(CommitStatus.SUCCESS);
- } else {
- LOG.info("Commit status check: Commit to {}.{} of {} failed", database, tableName, newMetadataLocation);
- status.set(CommitStatus.FAILURE);
- }
- });
-
- if (status.get() == CommitStatus.UNKNOWN) {
- LOG.error("Cannot determine commit state to {}.{}. Failed during checking {} times. " +
- "Treating commit state as unknown.",
- database, tableName, maxAttempts);
- }
- return status.get();
- }
-
@VisibleForTesting
void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException {
if (updateHiveTable) {