You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2021/06/23 03:33:35 UTC
[iceberg] branch master updated: AWS: add DynamoDb catalog (#2688)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f81d8ad AWS: add DynamoDb catalog (#2688)
f81d8ad is described below
commit f81d8adc9e4413265b63ca092ca744a22a0c9c65
Author: Jack Ye <yz...@amazon.com>
AuthorDate: Tue Jun 22 20:33:23 2021 -0700
AWS: add DynamoDb catalog (#2688)
* AWS: add DynamoDb catalog
* fix spacing, add comment
---
.../iceberg/aws/dynamodb/DynamoDbCatalogTest.java | 289 ++++++++++
.../java/org/apache/iceberg/aws/AwsProperties.java | 22 +
.../iceberg/aws/dynamodb/DynamoDbCatalog.java | 613 +++++++++++++++++++++
.../aws/dynamodb/DynamoDbTableOperations.java | 215 ++++++++
4 files changed, 1139 insertions(+)
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java
new file mode 100644
index 0000000..8d4273c
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsClientFactory;
+import org.apache.iceberg.aws.AwsIntegTestUtil;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+public class DynamoDbCatalogTest {
+
+ private static final ForkJoinPool POOL = new ForkJoinPool(16);
+ private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "id", Types.StringType.get()));
+
+ private static String catalogTableName;
+ private static DynamoDbClient dynamo;
+ private static S3Client s3;
+ private static DynamoDbCatalog catalog;
+ private static String testBucket;
+
+ @BeforeClass
+ public static void beforeClass() {
+ catalogTableName = genRandomName();
+ AwsClientFactory clientFactory = AwsClientFactories.defaultFactory();
+ dynamo = clientFactory.dynamo();
+ s3 = clientFactory.s3();
+ catalog = new DynamoDbCatalog();
+ testBucket = AwsIntegTestUtil.testBucketName();
+ catalog.initialize("test", ImmutableMap.of(
+ AwsProperties.DYNAMODB_TABLE_NAME, catalogTableName,
+ CatalogProperties.WAREHOUSE_LOCATION, "s3://" + testBucket + "/" + genRandomName()));
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ dynamo.deleteTable(DeleteTableRequest.builder().tableName(catalogTableName).build());
+ }
+
+ @Test
+ public void testCreateNamespace() {
+ Namespace namespace = Namespace.of(genRandomName());
+ catalog.createNamespace(namespace);
+ GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+ .tableName(catalogTableName)
+ .key(DynamoDbCatalog.namespacePrimaryKey(namespace))
+ .build());
+ Assert.assertTrue("namespace must exist", response.hasItem());
+ Assert.assertEquals("namespace must be stored in DynamoDB",
+ namespace.toString(), response.item().get("namespace").s());
+
+ AssertHelpers.assertThrows("should not create duplicated namespace",
+ AlreadyExistsException.class,
+ "already exists",
+ () -> catalog.createNamespace(namespace));
+ }
+
+ @Test
+ public void testCreateNamespaceBadName() {
+ AssertHelpers.assertThrows("should not create namespace with empty level",
+ ValidationException.class,
+ "must not be empty",
+ () -> catalog.createNamespace(Namespace.of("a", "", "b")));
+
+ AssertHelpers.assertThrows("should not create namespace with dot in level",
+ ValidationException.class,
+ "must not contain dot",
+ () -> catalog.createNamespace(Namespace.of("a", "b.c")));
+ }
+
+ @Test
+ public void testListSubNamespaces() {
+ Namespace parent = Namespace.of(genRandomName());
+ List<Namespace> namespaceList = IntStream.range(0, 3)
+ .mapToObj(i -> Namespace.of(parent.toString(), genRandomName()))
+ .collect(Collectors.toList());
+ catalog.createNamespace(parent);
+ namespaceList.forEach(ns -> catalog.createNamespace(ns));
+ Assert.assertEquals(4, catalog.listNamespaces(parent).size());
+ }
+
+ @Test
+ public void testNamespaceProperties() {
+ Namespace namespace = Namespace.of(genRandomName());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key1", "val1");
+ properties.put("key2", "val2");
+ catalog.createNamespace(namespace, properties);
+ Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));
+
+ properties.put("key3", "val3");
+ properties.put("key2", "val2-1");
+ catalog.setProperties(namespace, properties);
+ Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));
+
+ properties.remove("key3");
+ catalog.removeProperties(namespace, Sets.newHashSet("key3"));
+ Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));
+ }
+
+ @Test
+ public void testCreateTable() {
+ Namespace namespace = Namespace.of(genRandomName());
+ catalog.createNamespace(namespace);
+ TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
+ catalog.createTable(tableIdentifier, SCHEMA);
+ GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+ .tableName(catalogTableName)
+ .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier))
+ .build());
+ Assert.assertTrue("table must exist", response.hasItem());
+ Assert.assertEquals("table must be stored in DynamoDB with table identifier as partition key",
+ tableIdentifier.toString(), response.item().get("identifier").s());
+ Assert.assertEquals("table must be stored in DynamoDB with namespace as sort key",
+ namespace.toString(), response.item().get("namespace").s());
+
+ AssertHelpers.assertThrows("should not create duplicated table",
+ AlreadyExistsException.class,
+ "already exists",
+ () -> catalog.createTable(tableIdentifier, SCHEMA));
+ }
+
+ @Test
+ public void testCreateTableBadName() {
+ Namespace namespace = Namespace.of(genRandomName());
+ catalog.createNamespace(namespace);
+ AssertHelpers.assertThrows("should not create table name with empty namespace",
+ ValidationException.class,
+ "Table namespace must not be empty",
+ () -> catalog.createTable(TableIdentifier.of(Namespace.empty(), "a"), SCHEMA));
+
+ AssertHelpers.assertThrows("should not create table name with dot",
+ ValidationException.class,
+ "must not contain dot",
+ () -> catalog.createTable(TableIdentifier.of(namespace, "a.b"), SCHEMA));
+ }
+
+ @Test
+ public void testListTable() {
+ Namespace namespace = Namespace.of(genRandomName());
+ catalog.createNamespace(namespace);
+ List<TableIdentifier> tableIdentifiers = IntStream.range(0, 3)
+ .mapToObj(i -> TableIdentifier.of(namespace, genRandomName()))
+ .collect(Collectors.toList());
+ tableIdentifiers.forEach(id -> catalog.createTable(id, SCHEMA));
+ Assert.assertEquals(3, catalog.listTables(namespace).size());
+ }
+
+ @Test
+ public void testDropTable() {
+ Namespace namespace = Namespace.of(genRandomName());
+ catalog.createNamespace(namespace);
+ TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
+ catalog.createTable(tableIdentifier, SCHEMA);
+ String metadataLocation = dynamo.getItem(GetItemRequest.builder()
+ .tableName(catalogTableName)
+ .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
+ .item().get("p.metadata_location").s();
+ catalog.dropTable(tableIdentifier, true);
+ Assert.assertFalse("table entry should not exist in dynamo",
+ dynamo.getItem(GetItemRequest.builder()
+ .tableName(catalogTableName)
+ .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
+ .hasItem());
+ AssertHelpers.assertThrows("metadata location should be deleted",
+ NoSuchKeyException.class,
+ () -> s3.headObject(HeadObjectRequest.builder()
+ .bucket(testBucket)
+ .key(metadataLocation.substring(testBucket.length() + 6)) // s3:// + end slash
+ .build()));
+ }
+
+ @Test
+ public void testRenameTable() {
+ Namespace namespace = Namespace.of(genRandomName());
+ catalog.createNamespace(namespace);
+ Namespace namespace2 = Namespace.of(genRandomName());
+ catalog.createNamespace(namespace2);
+ TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
+ catalog.createTable(tableIdentifier, SCHEMA);
+ TableIdentifier tableIdentifier2 = TableIdentifier.of(namespace2, genRandomName());
+
+ AssertHelpers.assertThrows("should not be able to rename a table not exist",
+ NoSuchTableException.class,
+ "does not exist",
+ () -> catalog.renameTable(TableIdentifier.of(namespace, "a"), tableIdentifier2));
+
+ AssertHelpers.assertThrows("should not be able to rename an existing table",
+ AlreadyExistsException.class,
+ "already exists",
+ () -> catalog.renameTable(tableIdentifier, tableIdentifier));
+
+ String metadataLocation = dynamo.getItem(GetItemRequest.builder()
+ .tableName(catalogTableName)
+ .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
+ .item().get("p.metadata_location").s();
+
+ catalog.renameTable(tableIdentifier, tableIdentifier2);
+
+ String metadataLocation2 = dynamo.getItem(GetItemRequest.builder()
+ .tableName(catalogTableName)
+ .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier2)).build())
+ .item().get("p.metadata_location").s();
+
+ Assert.assertEquals("metadata location should be copied to new table entry",
+ metadataLocation, metadataLocation2);
+ }
+
+ @Test
+ public void testUpdateTable() {
+ Namespace namespace = Namespace.of(genRandomName());
+ catalog.createNamespace(namespace);
+ TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
+ catalog.createTable(tableIdentifier, SCHEMA);
+ Table table = catalog.loadTable(tableIdentifier);
+ table.updateSchema().addColumn("data", Types.StringType.get()).commit();
+ table.refresh();
+ Assert.assertEquals(2, table.schema().columns().size());
+ }
+
+ @Test
+ public void testConcurrentCommits() throws Exception {
+ Namespace namespace = Namespace.of(genRandomName());
+ catalog.createNamespace(namespace);
+ TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
+ catalog.createTable(tableIdentifier, SCHEMA);
+ Table table = catalog.loadTable(tableIdentifier);
+ POOL.submit(() -> IntStream.range(0, 16).parallel()
+ .forEach(i -> {
+ try {
+ table.updateSchema().addColumn(genRandomName(), Types.StringType.get()).commit();
+ } catch (Exception e) {
+ // ignore
+ }
+ })).get();
+
+ Assert.assertEquals(2, table.schema().columns().size());
+ }
+
+ private static String genRandomName() {
+ return UUID.randomUUID().toString().replace("-", "");
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index ffc30c1..0985559 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.aws;
import java.io.Serializable;
import java.util.Map;
+import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.PropertyUtil;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
@@ -131,6 +132,12 @@ public class AwsProperties implements Serializable {
public static final String S3FILEIO_ACL = "s3.acl";
/**
+ * DynamoDB table name for {@link DynamoDbCatalog}
+ */
+ public static final String DYNAMODB_TABLE_NAME = "dynamodb.table-name";
+ public static final String DYNAMODB_TABLE_NAME_DEFAULT = "iceberg";
+
+ /**
* The implementation class of {@link AwsClientFactory} to customize AWS client configurations.
* If set, all AWS clients will be initialized by the specified factory.
* If not set, {@link AwsClientFactories#defaultFactory()} is used as default factory.
@@ -180,6 +187,8 @@ public class AwsProperties implements Serializable {
private String glueCatalogId;
private boolean glueCatalogSkipArchive;
+ private String dynamoDbTableName;
+
public AwsProperties() {
this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
this.s3FileIoSseKey = null;
@@ -193,6 +202,8 @@ public class AwsProperties implements Serializable {
this.glueCatalogId = null;
this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
+
+ this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;
}
public AwsProperties(Map<String, String> properties) {
@@ -236,6 +247,9 @@ public class AwsProperties implements Serializable {
this.s3FileIoAcl = ObjectCannedACL.fromValue(aclType);
Preconditions.checkArgument(s3FileIoAcl == null || !s3FileIoAcl.equals(ObjectCannedACL.UNKNOWN_TO_SDK_VERSION),
"Cannot support S3 CannedACL " + aclType);
+
+ this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME,
+ DYNAMODB_TABLE_NAME_DEFAULT);
}
public String s3FileIoSseType() {
@@ -317,4 +331,12 @@ public class AwsProperties implements Serializable {
public void setS3FileIoAcl(ObjectCannedACL acl) {
this.s3FileIoAcl = acl;
}
+
+ public String dynamoDbTableName() {
+ return dynamoDbTableName;
+ }
+
+ public void setDynamoDbTableName(String name) {
+ this.dynamoDbTableName = name;
+ }
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
new file mode 100644
index 0000000..f9bef15
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+ private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+ static final Joiner COMMA = Joiner.on(',');
+
+ private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+ private static final String COL_IDENTIFIER = "identifier";
+ private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+ private static final String COL_NAMESPACE = "namespace";
+ private static final String PROPERTY_COL_PREFIX = "p.";
+ private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+ private static final String COL_CREATED_AT = "created_at";
+ private static final String COL_UPDATED_AT = "updated_at";
+
+ // field used for optimistic locking
+ static final String COL_VERSION = "v";
+
+ private DynamoDbClient dynamo;
+ private Configuration hadoopConf;
+ private String catalogName;
+ private String warehousePath;
+ private AwsProperties awsProperties;
+ private FileIO fileIO;
+
+ public DynamoDbCatalog() {
+ }
+
+ @Override
+ public void initialize(String name, Map<String, String> properties) {
+ initialize(
+ name,
+ properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+ new AwsProperties(properties),
+ AwsClientFactories.from(properties).dynamo(),
+ initializeFileIO(properties));
+ }
+
+ @VisibleForTesting
+ void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+ this.catalogName = name;
+ this.awsProperties = properties;
+ this.warehousePath = cleanWarehousePath(path);
+ this.dynamo = client;
+ this.fileIO = io;
+ ensureCatalogTableExistsOrCreate();
+ }
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ @Override
+ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+ validateTableIdentifier(tableIdentifier);
+ return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+ }
+
+ @Override
+ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+ validateTableIdentifier(tableIdentifier);
+ GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .consistentRead(true)
+ .key(namespacePrimaryKey(tableIdentifier.namespace()))
+ .build());
+
+ if (!response.hasItem()) {
+ throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+ tableIdentifier.namespace());
+ }
+
+ String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+ if (response.item().containsKey(defaultLocationCol)) {
+ return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+ } else {
+ return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+ }
+ }
+
+ @Override
+ public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+ validateNamespace(namespace);
+ Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+ setNewCatalogEntryMetadata(values);
+ metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+ try {
+ dynamo.putItem(PutItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+ .item(values)
+ .build());
+ } catch (ConditionalCheckFailedException e) {
+ throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+ }
+ }
+
+ @Override
+ public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+ validateNamespace(namespace);
+ List<Namespace> namespaces = Lists.newArrayList();
+ Map<String, AttributeValue> lastEvaluatedKey = null;
+ String condition = COL_IDENTIFIER + " = :identifier";
+ Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+ conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+ if (!namespace.isEmpty()) {
+ condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+ conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+ }
+
+ do {
+ QueryResponse response = dynamo.query(QueryRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .consistentRead(true)
+ .keyConditionExpression(condition)
+ .expressionAttributeValues(conditionValues)
+ .exclusiveStartKey(lastEvaluatedKey)
+ .build());
+
+ if (response.hasItems()) {
+ for (Map<String, AttributeValue> item : response.items()) {
+ String ns = item.get(COL_NAMESPACE).s();
+ namespaces.add(Namespace.of(ns.split("\\.")));
+ }
+ }
+
+ lastEvaluatedKey = response.lastEvaluatedKey();
+ } while (!lastEvaluatedKey.isEmpty());
+
+ return namespaces;
+ }
+
+ @Override
+ public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+ validateNamespace(namespace);
+ GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .consistentRead(true)
+ .key(namespacePrimaryKey(namespace))
+ .build());
+
+ if (!response.hasItem()) {
+ throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+ }
+
+ return response.item().entrySet().stream()
+ .filter(e -> isProperty(e.getKey()))
+ .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+ }
+
+ @Override
+ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+ validateNamespace(namespace);
+ if (!listTables(namespace).isEmpty()) {
+ throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+ }
+
+ try {
+ dynamo.deleteItem(DeleteItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .key(namespacePrimaryKey(namespace))
+ .conditionExpression("attribute_exists(" + namespace + ")")
+ .build());
+ return true;
+ } catch (ConditionalCheckFailedException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+ List<String> updateParts = Lists.newArrayList();
+ Map<String, String> attributeNames = Maps.newHashMap();
+ Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+ int idx = 0;
+ for (Map.Entry<String, String> property : properties.entrySet()) {
+ String attributeValue = ":v" + idx;
+ String attributeKey = "#k" + idx;
+ idx++;
+ updateParts.add(attributeKey + " = " + attributeValue);
+ attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+ attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+ }
+
+ updateCatalogEntryMetadata(updateParts, attributeValues);
+ String updateExpression = "SET " + COMMA.join(updateParts);
+ return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+ }
+
+ @Override
+ public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+ List<String> removeParts = Lists.newArrayList(properties.iterator());
+ Map<String, String> attributeNames = Maps.newHashMap();
+ Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+ int idx = 0;
+ for (String property : properties) {
+ String attributeKey = "#k" + idx;
+ idx++;
+ removeParts.add(attributeKey);
+ attributeNames.put(attributeKey, toPropertyCol(property));
+ }
+
+ List<String> updateParts = Lists.newArrayList();
+ updateCatalogEntryMetadata(updateParts, attributeValues);
+ String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+ return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+ }
+
+ @Override
+ public List<TableIdentifier> listTables(Namespace namespace) {
+ List<TableIdentifier> identifiers = Lists.newArrayList();
+ Map<String, AttributeValue> lastEvaluatedKey;
+ String condition = COL_NAMESPACE + " = :ns";
+ Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+ ":ns", AttributeValue.builder().s(namespace.toString()).build());
+ do {
+ QueryResponse response = dynamo.query(QueryRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .indexName(GSI_NAMESPACE_IDENTIFIER)
+ .keyConditionExpression(condition)
+ .expressionAttributeValues(conditionValues)
+ .build());
+
+ if (response.hasItems()) {
+ for (Map<String, AttributeValue> item : response.items()) {
+ String identifier = item.get(COL_IDENTIFIER).s();
+ if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+ identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+ }
+ }
+ }
+
+ lastEvaluatedKey = response.lastEvaluatedKey();
+ } while (!lastEvaluatedKey.isEmpty());
+ return identifiers;
+ }
+
+ @Override
+ public boolean dropTable(TableIdentifier identifier, boolean purge) {
+ Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+ try {
+ GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .consistentRead(true)
+ .key(key)
+ .build());
+
+ if (!response.hasItem()) {
+ throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+ }
+
+ TableOperations ops = newTableOps(identifier);
+ TableMetadata lastMetadata = ops.current();
+ dynamo.deleteItem(DeleteItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .key(tablePrimaryKey(identifier))
+ .conditionExpression(COL_VERSION + " = :v")
+ .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+ .build());
+ LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+ if (purge && lastMetadata != null) {
+ CatalogUtil.dropTableData(ops.io(), lastMetadata);
+ LOG.info("Table {} data purged", identifier);
+ }
+
+ LOG.info("Dropped table: {}", identifier);
+ return true;
+ } catch (ConditionalCheckFailedException e) {
+ LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+ return false;
+ } catch (Exception e) {
+ LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void renameTable(TableIdentifier from, TableIdentifier to) {
+ Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+ Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+ GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .consistentRead(true)
+ .key(fromKey)
+ .build());
+
+ if (!fromResponse.hasItem()) {
+ throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+ }
+
+ GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .consistentRead(true)
+ .key(toKey)
+ .build());
+
+ if (toResponse.hasItem()) {
+ throw new AlreadyExistsException("Cannot rename table %s to %s: %s already exists", from, to, to);
+ }
+
+ fromResponse.item().entrySet().stream()
+ .filter(e -> isProperty(e.getKey()))
+ .forEach(e -> toKey.put(e.getKey(), e.getValue()));
+
+ setNewCatalogEntryMetadata(toKey);
+
+ dynamo.transactWriteItems(TransactWriteItemsRequest.builder()
+ .transactItems(
+ TransactWriteItem.builder()
+ .delete(Delete.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .key(fromKey)
+ .conditionExpression(COL_VERSION + " = :v")
+ .expressionAttributeValues(ImmutableMap.of(":v", fromResponse.item().get(COL_VERSION)))
+ .build())
+ .build(),
+ TransactWriteItem.builder()
+ .put(Put.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .item(toKey)
+ .conditionExpression("attribute_not_exists(" + COL_VERSION + ")")
+ .build())
+ .build())
+ .build());
+
+ LOG.info("Successfully renamed table from {} to {}", from, to);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ hadoopConf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return hadoopConf;
+ }
+
+ @Override
+ public void close() throws IOException {
+ dynamo.close();
+ }
+
+ /**
+ * The property used to set a default location for tables in a namespace.
+ * Call {@link #setProperties(Namespace, Map)} to set a path value using this property for a namespace,
+ * then all tables in the namespace will have default table root path under that given path.
+ * @return default location property key
+ */
+ public static String defaultLocationProperty() {
+ return PROPERTY_DEFAULT_LOCATION;
+ }
+
+ static String toPropertyCol(String propertyKey) {
+ return PROPERTY_COL_PREFIX + propertyKey;
+ }
+
+ static boolean isProperty(String dynamoCol) {
+ return dynamoCol.startsWith(PROPERTY_COL_PREFIX);
+ }
+
+ static String toPropertyKey(String propertyCol) {
+ return propertyCol.substring(PROPERTY_COL_PREFIX.length());
+ }
+
+ static Map<String, AttributeValue> namespacePrimaryKey(Namespace namespace) {
+ Map<String, AttributeValue> key = Maps.newHashMap();
+ key.put(COL_IDENTIFIER, AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+ key.put(COL_NAMESPACE, AttributeValue.builder().s(namespace.toString()).build());
+ return key;
+ }
+
+ static Map<String, AttributeValue> tablePrimaryKey(TableIdentifier identifier) {
+ Map<String, AttributeValue> key = Maps.newHashMap();
+ key.put(COL_IDENTIFIER, AttributeValue.builder().s(identifier.toString()).build());
+ key.put(COL_NAMESPACE, AttributeValue.builder().s(identifier.namespace().toString()).build());
+ return key;
+ }
+
+ static void setNewCatalogEntryMetadata(Map<String, AttributeValue> values) {
+ String current = Long.toString(System.currentTimeMillis());
+ values.put(COL_CREATED_AT, AttributeValue.builder().n(current).build());
+ values.put(COL_UPDATED_AT, AttributeValue.builder().n(current).build());
+ values.put(COL_VERSION, AttributeValue.builder().s(UUID.randomUUID().toString()).build());
+ }
+
+ static void updateCatalogEntryMetadata(List<String> updateParts, Map<String, AttributeValue> attributeValues) {
+ updateParts.add(COL_UPDATED_AT + " = :uat");
+ attributeValues.put(":uat", AttributeValue.builder().n(Long.toString(System.currentTimeMillis())).build());
+ updateParts.add(COL_VERSION + " = :uv");
+ attributeValues.put(":uv", AttributeValue.builder().s(UUID.randomUUID().toString()).build());
+ }
+
+ private FileIO initializeFileIO(Map<String, String> properties) {
+ String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+ if (fileIOImpl == null) {
+ FileIO io = new S3FileIO();
+ io.initialize(properties);
+ return io;
+ } else {
+ return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+ }
+ }
+
+ private String cleanWarehousePath(String path) {
+ Preconditions.checkArgument(path != null && path.length() > 0,
+ "Cannot initialize DynamoDbCatalog because warehousePath must not be null");
+ int len = path.length();
+ if (path.charAt(len - 1) == '/') {
+ return path.substring(0, len - 1);
+ } else {
+ return path;
+ }
+ }
+
+ private void validateNamespace(Namespace namespace) {
+ for (String level : namespace.levels()) {
+ ValidationException.check(level != null && !level.isEmpty(),
+ "Namespace level must not be empty: %s", namespace);
+ ValidationException.check(!level.contains("."),
+ "Namespace level must not contain dot, but found %s in %s", level, namespace);
+ }
+ }
+
+ private void validateTableIdentifier(TableIdentifier identifier) {
+ validateNamespace(identifier.namespace());
+ ValidationException.check(identifier.hasNamespace(),
+ "Table namespace must not be empty: %s", identifier);
+ String tableName = identifier.name();
+ ValidationException.check(!tableName.contains("."),
+ "Table name must not contain dot: %s", tableName);
+ }
+
+ private boolean dynamoDbTableExists(String tableName) {
+ try {
+ dynamo.describeTable(DescribeTableRequest.builder()
+ .tableName(tableName)
+ .build());
+ return true;
+ } catch (ResourceNotFoundException e) {
+ return false;
+ }
+ }
+
+ private void ensureCatalogTableExistsOrCreate() {
+ if (dynamoDbTableExists(awsProperties.dynamoDbTableName())) {
+ return;
+ }
+
+ LOG.info("DynamoDb catalog table {} not found, trying to create", awsProperties.dynamoDbTableName());
+ dynamo.createTable(CreateTableRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .keySchema(
+ KeySchemaElement.builder().attributeName(COL_IDENTIFIER).keyType(KeyType.HASH).build(),
+ KeySchemaElement.builder().attributeName(COL_NAMESPACE).keyType(KeyType.RANGE).build())
+ .attributeDefinitions(
+ AttributeDefinition.builder().attributeName(COL_IDENTIFIER).attributeType(ScalarAttributeType.S).build(),
+ AttributeDefinition.builder().attributeName(COL_NAMESPACE).attributeType(ScalarAttributeType.S).build())
+ .globalSecondaryIndexes(GlobalSecondaryIndex.builder()
+ .indexName(GSI_NAMESPACE_IDENTIFIER)
+ .keySchema(
+ KeySchemaElement.builder().attributeName(COL_NAMESPACE).keyType(KeyType.HASH).build(),
+ KeySchemaElement.builder().attributeName(COL_IDENTIFIER).keyType(KeyType.RANGE).build())
+ .projection(Projection.builder().projectionType(ProjectionType.KEYS_ONLY).build())
+ .build())
+ .billingMode(BillingMode.PAY_PER_REQUEST)
+ .build());
+
+ // wait for the dynamo table to complete provisioning, which takes around 10 seconds
+ Tasks.foreach(awsProperties.dynamoDbTableName())
+ .retry(CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+ .throwFailureWhenFinished()
+ .onlyRetryOn(IllegalStateException.class)
+ .run(this::checkTableActive);
+ }
+
+ private void checkTableActive(String tableName) {
+ try {
+ DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+ .tableName(tableName)
+ .build());
+ TableStatus currentStatus = response.table().tableStatus();
+ if (!currentStatus.equals(TableStatus.ACTIVE)) {
+ throw new IllegalStateException(String.format("Dynamo catalog table %s is not active, current status: %s",
+ tableName, currentStatus));
+ }
+ } catch (ResourceNotFoundException e) {
+ throw new IllegalStateException(String.format("Cannot find Dynamo catalog table %s", tableName));
+ }
+ }
+
+ private boolean updateProperties(Namespace namespace, String updateExpression,
+ Map<String, AttributeValue> attributeValues,
+ Map<String, String> attributeNames) {
+ validateNamespace(namespace);
+ Map<String, AttributeValue> key = namespacePrimaryKey(namespace);
+ try {
+ GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .consistentRead(true)
+ .key(key)
+ .build());
+
+ if (!response.hasItem()) {
+ throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+ }
+
+ attributeValues.put(":v", response.item().get(COL_VERSION));
+ dynamo.updateItem(UpdateItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .key(key)
+ .conditionExpression(COL_VERSION + " = :v")
+ .updateExpression(updateExpression)
+ .expressionAttributeValues(attributeValues)
+ .expressionAttributeNames(attributeNames)
+ .build());
+ return true;
+ } catch (ConditionalCheckFailedException e) {
+ return false;
+ }
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java
new file mode 100644
index 0000000..81157e9
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+class DynamoDbTableOperations extends BaseMetastoreTableOperations {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DynamoDbTableOperations.class);
+
+ private final DynamoDbClient dynamo;
+ private final AwsProperties awsProperties;
+ private final TableIdentifier tableIdentifier;
+ private final String fullTableName;
+ private final FileIO fileIO;
+
+ DynamoDbTableOperations(
+ DynamoDbClient dynamo,
+ AwsProperties awsProperties,
+ String catalogName,
+ FileIO fileIO,
+ TableIdentifier tableIdentifier) {
+ this.dynamo = dynamo;
+ this.awsProperties = awsProperties;
+ this.fullTableName = String.format("%s.%s", catalogName, tableIdentifier);
+ this.tableIdentifier = tableIdentifier;
+ this.fileIO = fileIO;
+ }
+
+ @Override
+ protected String tableName() {
+ return fullTableName;
+ }
+
+ @Override
+ public FileIO io() {
+ return fileIO;
+ }
+
+ @Override
+ protected void doRefresh() {
+ String metadataLocation = null;
+ GetItemResponse table = dynamo.getItem(GetItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .consistentRead(true)
+ .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier))
+ .build());
+ if (table.hasItem()) {
+ metadataLocation = getMetadataLocation(table);
+ } else {
+ if (currentMetadataLocation() != null) {
+ throw new NoSuchTableException("Cannot find table %s after refresh, " +
+ "maybe another process deleted it or revoked your access permission", tableName());
+ }
+ }
+
+ refreshFromMetadataLocation(metadataLocation);
+ }
+
+ @Override
+ protected void doCommit(TableMetadata base, TableMetadata metadata) {
+ String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+ CommitStatus commitStatus = CommitStatus.FAILURE;
+ Map<String, AttributeValue> tableKey = DynamoDbCatalog.tablePrimaryKey(tableIdentifier);
+ try {
+ GetItemResponse table = dynamo.getItem(GetItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .consistentRead(true)
+ .key(tableKey)
+ .build());
+ checkMetadataLocation(table, base);
+ Map<String, String> properties = prepareProperties(table, newMetadataLocation);
+ persistTable(tableKey, table, properties);
+ commitStatus = CommitStatus.SUCCESS;
+ } catch (ConditionalCheckFailedException e) {
+ throw new CommitFailedException(e, "Cannot commit %s: concurrent update detected", tableName());
+ } catch (RuntimeException persistFailure) {
+ LOG.error("Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
+ fullTableName, persistFailure);
+ commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+
+ switch (commitStatus) {
+ case SUCCESS:
+ break;
+ case FAILURE:
+ throw new CommitFailedException(persistFailure,
+ "Cannot commit %s due to unexpected exception", tableName());
+ case UNKNOWN:
+ throw new CommitStateUnknownException(persistFailure);
+ }
+ } finally {
+ try {
+ if (commitStatus == CommitStatus.FAILURE) {
+ // if anything went wrong, clean up the uncommitted metadata file
+ io().deleteFile(newMetadataLocation);
+ }
+ } catch (RuntimeException e) {
+ LOG.error("Fail to cleanup metadata file at {}", newMetadataLocation, e);
+ throw e;
+ }
+ }
+ }
+
+ private void checkMetadataLocation(GetItemResponse table, TableMetadata base) {
+ String dynamoMetadataLocation = table.hasItem() ? getMetadataLocation(table) : null;
+ String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
+ if (!Objects.equals(baseMetadataLocation, dynamoMetadataLocation)) {
+ throw new CommitFailedException(
+ "Cannot commit %s because base metadata location '%s' is not same as the current DynamoDb location '%s'",
+ tableName(), baseMetadataLocation, dynamoMetadataLocation);
+ }
+ }
+
+ private String getMetadataLocation(GetItemResponse table) {
+ return table.item().get(DynamoDbCatalog.toPropertyCol(METADATA_LOCATION_PROP)).s();
+ }
+
+ private Map<String, String> prepareProperties(GetItemResponse response, String newMetadataLocation) {
+ Map<String, String> properties = response.hasItem() ? getProperties(response) : Maps.newHashMap();
+ properties.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
+ properties.put(METADATA_LOCATION_PROP, newMetadataLocation);
+ if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) {
+ properties.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
+ }
+
+ return properties;
+ }
+
+ private Map<String, String> getProperties(GetItemResponse table) {
+ return table.item().entrySet().stream()
+ .filter(e -> DynamoDbCatalog.isProperty(e.getKey()))
+ .collect(Collectors.toMap(e -> DynamoDbCatalog.toPropertyKey(e.getKey()), e -> e.getValue().s()));
+ }
+
+ void persistTable(Map<String, AttributeValue> tableKey, GetItemResponse table, Map<String, String> parameters) {
+ if (table.hasItem()) {
+ LOG.debug("Committing existing DynamoDb catalog table: {}", tableName());
+ List<String> updateParts = Lists.newArrayList();
+ Map<String, String> attributeNames = Maps.newHashMap();
+ Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+ int idx = 0;
+ for (Map.Entry<String, String> property : parameters.entrySet()) {
+ String attributeValue = ":v" + idx;
+ String attributeKey = "#k" + idx;
+ idx++;
+ updateParts.add(attributeKey + " = " + attributeValue);
+ attributeNames.put(attributeKey, DynamoDbCatalog.toPropertyCol(property.getKey()));
+ attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+ }
+ DynamoDbCatalog.updateCatalogEntryMetadata(updateParts, attributeValues);
+ String updateExpression = "SET " + DynamoDbCatalog.COMMA.join(updateParts);
+ attributeValues.put(":v", table.item().get(DynamoDbCatalog.COL_VERSION));
+ dynamo.updateItem(UpdateItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .key(tableKey)
+ .conditionExpression(DynamoDbCatalog.COL_VERSION + " = :v")
+ .updateExpression(updateExpression)
+ .expressionAttributeValues(attributeValues)
+ .expressionAttributeNames(attributeNames)
+ .build());
+ } else {
+ LOG.debug("Committing new DynamoDb catalog table: {}", tableName());
+ Map<String, AttributeValue> values = Maps.newHashMap(tableKey);
+ parameters.forEach((k, v) -> values.put(DynamoDbCatalog.toPropertyCol(k),
+ AttributeValue.builder().s(v).build()));
+ DynamoDbCatalog.setNewCatalogEntryMetadata(values);
+
+ dynamo.putItem(PutItemRequest.builder()
+ .tableName(awsProperties.dynamoDbTableName())
+ .item(values)
+ .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+ .build());
+ }
+ }
+}