You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2021/06/23 03:33:35 UTC

[iceberg] branch master updated: AWS: add DynamoDb catalog (#2688)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f81d8ad  AWS: add DynamoDb catalog (#2688)
f81d8ad is described below

commit f81d8adc9e4413265b63ca092ca744a22a0c9c65
Author: Jack Ye <yz...@amazon.com>
AuthorDate: Tue Jun 22 20:33:23 2021 -0700

    AWS: add DynamoDb catalog (#2688)
    
    * AWS: add DynamoDb catalog
    
    * fix spacing, add comment
---
 .../iceberg/aws/dynamodb/DynamoDbCatalogTest.java  | 289 ++++++++++
 .../java/org/apache/iceberg/aws/AwsProperties.java |  22 +
 .../iceberg/aws/dynamodb/DynamoDbCatalog.java      | 613 +++++++++++++++++++++
 .../aws/dynamodb/DynamoDbTableOperations.java      | 215 ++++++++
 4 files changed, 1139 insertions(+)

diff --git a/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java
new file mode 100644
index 0000000..8d4273c
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsClientFactory;
+import org.apache.iceberg.aws.AwsIntegTestUtil;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+public class DynamoDbCatalogTest {
+
+  private static final ForkJoinPool POOL = new ForkJoinPool(16);
+  private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "id", Types.StringType.get()));
+
+  private static String catalogTableName;
+  private static DynamoDbClient dynamo;
+  private static S3Client s3;
+  private static DynamoDbCatalog catalog;
+  private static String testBucket;
+
+  @BeforeClass
+  public static void beforeClass() {
+    catalogTableName = genRandomName();
+    AwsClientFactory clientFactory = AwsClientFactories.defaultFactory();
+    dynamo = clientFactory.dynamo();
+    s3 = clientFactory.s3();
+    catalog = new DynamoDbCatalog();
+    testBucket = AwsIntegTestUtil.testBucketName();
+    catalog.initialize("test", ImmutableMap.of(
+        AwsProperties.DYNAMODB_TABLE_NAME, catalogTableName,
+        CatalogProperties.WAREHOUSE_LOCATION, "s3://" + testBucket + "/" + genRandomName()));
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    dynamo.deleteTable(DeleteTableRequest.builder().tableName(catalogTableName).build());
+  }
+
+  @Test
+  public void testCreateNamespace() {
+    Namespace namespace = Namespace.of(genRandomName());
+    catalog.createNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(catalogTableName)
+        .key(DynamoDbCatalog.namespacePrimaryKey(namespace))
+        .build());
+    Assert.assertTrue("namespace must exist", response.hasItem());
+    Assert.assertEquals("namespace must be stored in DynamoDB",
+        namespace.toString(), response.item().get("namespace").s());
+
+    AssertHelpers.assertThrows("should not create duplicated namespace",
+        AlreadyExistsException.class,
+        "already exists",
+        () -> catalog.createNamespace(namespace));
+  }
+
+  @Test
+  public void testCreateNamespaceBadName() {
+    AssertHelpers.assertThrows("should not create namespace with empty level",
+        ValidationException.class,
+        "must not be empty",
+        () -> catalog.createNamespace(Namespace.of("a", "", "b")));
+
+    AssertHelpers.assertThrows("should not create namespace with dot in level",
+        ValidationException.class,
+        "must not contain dot",
+        () -> catalog.createNamespace(Namespace.of("a", "b.c")));
+  }
+
+  @Test
+  public void testListSubNamespaces() {
+    Namespace parent = Namespace.of(genRandomName());
+    List<Namespace> namespaceList = IntStream.range(0, 3)
+        .mapToObj(i -> Namespace.of(parent.toString(), genRandomName()))
+        .collect(Collectors.toList());
+    catalog.createNamespace(parent);
+    namespaceList.forEach(ns -> catalog.createNamespace(ns));
+    Assert.assertEquals(4, catalog.listNamespaces(parent).size());
+  }
+
+  @Test
+  public void testNamespaceProperties() {
+    Namespace namespace = Namespace.of(genRandomName());
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("key1", "val1");
+    properties.put("key2", "val2");
+    catalog.createNamespace(namespace, properties);
+    Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));
+
+    properties.put("key3", "val3");
+    properties.put("key2", "val2-1");
+    catalog.setProperties(namespace, properties);
+    Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));
+
+    properties.remove("key3");
+    catalog.removeProperties(namespace, Sets.newHashSet("key3"));
+    Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));
+  }
+
+  @Test
+  public void testCreateTable() {
+    Namespace namespace = Namespace.of(genRandomName());
+    catalog.createNamespace(namespace);
+    TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
+    catalog.createTable(tableIdentifier, SCHEMA);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(catalogTableName)
+        .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier))
+        .build());
+    Assert.assertTrue("table must exist", response.hasItem());
+    Assert.assertEquals("table must be stored in DynamoDB with table identifier as partition key",
+        tableIdentifier.toString(), response.item().get("identifier").s());
+    Assert.assertEquals("table must be stored in DynamoDB with namespace as sort key",
+        namespace.toString(), response.item().get("namespace").s());
+
+    AssertHelpers.assertThrows("should not create duplicated table",
+        AlreadyExistsException.class,
+        "already exists",
+        () -> catalog.createTable(tableIdentifier, SCHEMA));
+  }
+
+  @Test
+  public void testCreateTableBadName() {
+    Namespace namespace = Namespace.of(genRandomName());
+    catalog.createNamespace(namespace);
+    AssertHelpers.assertThrows("should not create table name with empty namespace",
+        ValidationException.class,
+        "Table namespace must not be empty",
+        () -> catalog.createTable(TableIdentifier.of(Namespace.empty(), "a"), SCHEMA));
+
+    AssertHelpers.assertThrows("should not create table name with dot",
+        ValidationException.class,
+        "must not contain dot",
+        () -> catalog.createTable(TableIdentifier.of(namespace, "a.b"), SCHEMA));
+  }
+
+  @Test
+  public void testListTable() {
+    Namespace namespace = Namespace.of(genRandomName());
+    catalog.createNamespace(namespace);
+    List<TableIdentifier> tableIdentifiers = IntStream.range(0, 3)
+        .mapToObj(i -> TableIdentifier.of(namespace, genRandomName()))
+        .collect(Collectors.toList());
+    tableIdentifiers.forEach(id -> catalog.createTable(id, SCHEMA));
+    Assert.assertEquals(3, catalog.listTables(namespace).size());
+  }
+
+  @Test
+  public void testDropTable() {
+    Namespace namespace = Namespace.of(genRandomName());
+    catalog.createNamespace(namespace);
+    TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
+    catalog.createTable(tableIdentifier, SCHEMA);
+    String metadataLocation = dynamo.getItem(GetItemRequest.builder()
+        .tableName(catalogTableName)
+        .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
+        .item().get("p.metadata_location").s();
+    catalog.dropTable(tableIdentifier, true);
+    Assert.assertFalse("table entry should not exist in dynamo",
+        dynamo.getItem(GetItemRequest.builder()
+            .tableName(catalogTableName)
+            .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
+            .hasItem());
+    AssertHelpers.assertThrows("metadata location should be deleted",
+        NoSuchKeyException.class,
+        () -> s3.headObject(HeadObjectRequest.builder()
+            .bucket(testBucket)
+            .key(metadataLocation.substring(testBucket.length() + 6)) // s3:// + end slash
+            .build()));
+  }
+
+  @Test
+  public void testRenameTable() {
+    Namespace namespace = Namespace.of(genRandomName());
+    catalog.createNamespace(namespace);
+    Namespace namespace2 = Namespace.of(genRandomName());
+    catalog.createNamespace(namespace2);
+    TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
+    catalog.createTable(tableIdentifier, SCHEMA);
+    TableIdentifier tableIdentifier2 = TableIdentifier.of(namespace2, genRandomName());
+
+    AssertHelpers.assertThrows("should not be able to rename a table not exist",
+        NoSuchTableException.class,
+        "does not exist",
+        () -> catalog.renameTable(TableIdentifier.of(namespace, "a"), tableIdentifier2));
+
+    AssertHelpers.assertThrows("should not be able to rename an existing table",
+        AlreadyExistsException.class,
+        "already exists",
+        () -> catalog.renameTable(tableIdentifier, tableIdentifier));
+
+    String metadataLocation = dynamo.getItem(GetItemRequest.builder()
+        .tableName(catalogTableName)
+        .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
+        .item().get("p.metadata_location").s();
+
+    catalog.renameTable(tableIdentifier, tableIdentifier2);
+
+    String metadataLocation2 = dynamo.getItem(GetItemRequest.builder()
+        .tableName(catalogTableName)
+        .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier2)).build())
+        .item().get("p.metadata_location").s();
+
+    Assert.assertEquals("metadata location should be copied to new table entry",
+        metadataLocation, metadataLocation2);
+  }
+
+  @Test
+  public void testUpdateTable() {
+    Namespace namespace = Namespace.of(genRandomName());
+    catalog.createNamespace(namespace);
+    TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
+    catalog.createTable(tableIdentifier, SCHEMA);
+    Table table = catalog.loadTable(tableIdentifier);
+    table.updateSchema().addColumn("data", Types.StringType.get()).commit();
+    table.refresh();
+    Assert.assertEquals(2, table.schema().columns().size());
+  }
+
+  @Test
+  public void testConcurrentCommits() throws Exception {
+    Namespace namespace = Namespace.of(genRandomName());
+    catalog.createNamespace(namespace);
+    TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
+    catalog.createTable(tableIdentifier, SCHEMA);
+    Table table = catalog.loadTable(tableIdentifier);
+    POOL.submit(() -> IntStream.range(0, 16).parallel()
+        .forEach(i -> {
+          try {
+            table.updateSchema().addColumn(genRandomName(), Types.StringType.get()).commit();
+          } catch (Exception e) {
+            // ignore
+          }
+        })).get();
+
+    Assert.assertEquals(2, table.schema().columns().size());
+  }
+
+  private static String genRandomName() {
+    return UUID.randomUUID().toString().replace("-", "");
+  }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index ffc30c1..0985559 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.aws;
 
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.util.PropertyUtil;
 import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
@@ -131,6 +132,12 @@ public class AwsProperties implements Serializable {
   public static final String S3FILEIO_ACL = "s3.acl";
 
   /**
+   * DynamoDB table name for {@link DynamoDbCatalog}
+   */
+  public static final String DYNAMODB_TABLE_NAME = "dynamodb.table-name";
+  public static final String DYNAMODB_TABLE_NAME_DEFAULT = "iceberg";
+
+  /**
    * The implementation class of {@link AwsClientFactory} to customize AWS client configurations.
    * If set, all AWS clients will be initialized by the specified factory.
    * If not set, {@link AwsClientFactories#defaultFactory()} is used as default factory.
@@ -180,6 +187,8 @@ public class AwsProperties implements Serializable {
   private String glueCatalogId;
   private boolean glueCatalogSkipArchive;
 
+  private String dynamoDbTableName;
+
   public AwsProperties() {
     this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
     this.s3FileIoSseKey = null;
@@ -193,6 +202,8 @@ public class AwsProperties implements Serializable {
 
     this.glueCatalogId = null;
     this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
+
+    this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;
   }
 
   public AwsProperties(Map<String, String> properties) {
@@ -236,6 +247,9 @@ public class AwsProperties implements Serializable {
     this.s3FileIoAcl = ObjectCannedACL.fromValue(aclType);
     Preconditions.checkArgument(s3FileIoAcl == null || !s3FileIoAcl.equals(ObjectCannedACL.UNKNOWN_TO_SDK_VERSION),
         "Cannot support S3 CannedACL " + aclType);
+
+    this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME,
+        DYNAMODB_TABLE_NAME_DEFAULT);
   }
 
   public String s3FileIoSseType() {
@@ -317,4 +331,12 @@ public class AwsProperties implements Serializable {
   public void setS3FileIoAcl(ObjectCannedACL acl) {
     this.s3FileIoAcl = acl;
   }
+
+  public String dynamoDbTableName() {
+    return dynamoDbTableName;
+  }
+
+  public void setDynamoDbTableName(String name) {
+    this.dynamoDbTableName = name;
+  }
 }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
new file mode 100644
index 0000000..f9bef15
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(toKey)
+        .build());
+
+    if (toResponse.hasItem()) {
+      throw new AlreadyExistsException("Cannot rename table %s to %s: %s already exists", from, to, to);
+    }
+
+    fromResponse.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .forEach(e -> toKey.put(e.getKey(), e.getValue()));
+
+    setNewCatalogEntryMetadata(toKey);
+
+    dynamo.transactWriteItems(TransactWriteItemsRequest.builder()
+        .transactItems(
+            TransactWriteItem.builder()
+                .delete(Delete.builder()
+                    .tableName(awsProperties.dynamoDbTableName())
+                    .key(fromKey)
+                    .conditionExpression(COL_VERSION + " = :v")
+                    .expressionAttributeValues(ImmutableMap.of(":v", fromResponse.item().get(COL_VERSION)))
+                    .build())
+                .build(),
+            TransactWriteItem.builder()
+                .put(Put.builder()
+                    .tableName(awsProperties.dynamoDbTableName())
+                    .item(toKey)
+                    .conditionExpression("attribute_not_exists(" + COL_VERSION + ")")
+                    .build())
+                .build())
+            .build());
+
+    LOG.info("Successfully renamed table from {} to {}", from, to);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    hadoopConf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return hadoopConf;
+  }
+
+  @Override
+  public void close() throws IOException {
+    dynamo.close();
+  }
+
+  /**
+   * The property used to set a default location for tables in a namespace.
+   * Call {@link #setProperties(Namespace, Map)} to set a path value using this property for a namespace,
+   * then all tables in the namespace will have default table root path under that given path.
+   * @return default location property key
+   */
+  public static String defaultLocationProperty() {
+    return PROPERTY_DEFAULT_LOCATION;
+  }
+
+  static String toPropertyCol(String propertyKey) {
+    return PROPERTY_COL_PREFIX + propertyKey;
+  }
+
+  static boolean isProperty(String dynamoCol) {
+    return dynamoCol.startsWith(PROPERTY_COL_PREFIX);
+  }
+
+  static String toPropertyKey(String propertyCol) {
+    return propertyCol.substring(PROPERTY_COL_PREFIX.length());
+  }
+
+  static Map<String, AttributeValue> namespacePrimaryKey(Namespace namespace) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_IDENTIFIER, AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    key.put(COL_NAMESPACE, AttributeValue.builder().s(namespace.toString()).build());
+    return key;
+  }
+
+  static Map<String, AttributeValue> tablePrimaryKey(TableIdentifier identifier) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_IDENTIFIER, AttributeValue.builder().s(identifier.toString()).build());
+    key.put(COL_NAMESPACE, AttributeValue.builder().s(identifier.namespace().toString()).build());
+    return key;
+  }
+
+  static void setNewCatalogEntryMetadata(Map<String, AttributeValue> values) {
+    String current = Long.toString(System.currentTimeMillis());
+    values.put(COL_CREATED_AT, AttributeValue.builder().n(current).build());
+    values.put(COL_UPDATED_AT, AttributeValue.builder().n(current).build());
+    values.put(COL_VERSION, AttributeValue.builder().s(UUID.randomUUID().toString()).build());
+  }
+
+  static void updateCatalogEntryMetadata(List<String> updateParts, Map<String, AttributeValue> attributeValues) {
+    updateParts.add(COL_UPDATED_AT + " = :uat");
+    attributeValues.put(":uat", AttributeValue.builder().n(Long.toString(System.currentTimeMillis())).build());
+    updateParts.add(COL_VERSION + " = :uv");
+    attributeValues.put(":uv", AttributeValue.builder().s(UUID.randomUUID().toString()).build());
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new S3FileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  private String cleanWarehousePath(String path) {
+    Preconditions.checkArgument(path != null && path.length() > 0,
+        "Cannot initialize DynamoDbCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.charAt(len - 1) == '/') {
+      return path.substring(0, len - 1);
+    } else {
+      return path;
+    }
+  }
+
+  private void validateNamespace(Namespace namespace) {
+    for (String level : namespace.levels()) {
+      ValidationException.check(level != null && !level.isEmpty(),
+          "Namespace level must not be empty: %s", namespace);
+      ValidationException.check(!level.contains("."),
+          "Namespace level must not contain dot, but found %s in %s", level, namespace);
+    }
+  }
+
+  private void validateTableIdentifier(TableIdentifier identifier) {
+    validateNamespace(identifier.namespace());
+    ValidationException.check(identifier.hasNamespace(),
+        "Table namespace must not be empty: %s", identifier);
+    String tableName = identifier.name();
+    ValidationException.check(!tableName.contains("."),
+        "Table name must not contain dot: %s", tableName);
+  }
+
+  private boolean dynamoDbTableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void ensureCatalogTableExistsOrCreate() {
+    if (dynamoDbTableExists(awsProperties.dynamoDbTableName())) {
+      return;
+    }
+
+    LOG.info("DynamoDb catalog table {} not found, trying to create", awsProperties.dynamoDbTableName());
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .keySchema(
+            KeySchemaElement.builder().attributeName(COL_IDENTIFIER).keyType(KeyType.HASH).build(),
+            KeySchemaElement.builder().attributeName(COL_NAMESPACE).keyType(KeyType.RANGE).build())
+        .attributeDefinitions(
+            AttributeDefinition.builder().attributeName(COL_IDENTIFIER).attributeType(ScalarAttributeType.S).build(),
+            AttributeDefinition.builder().attributeName(COL_NAMESPACE).attributeType(ScalarAttributeType.S).build())
+        .globalSecondaryIndexes(GlobalSecondaryIndex.builder()
+            .indexName(GSI_NAMESPACE_IDENTIFIER)
+            .keySchema(
+                KeySchemaElement.builder().attributeName(COL_NAMESPACE).keyType(KeyType.HASH).build(),
+                KeySchemaElement.builder().attributeName(COL_IDENTIFIER).keyType(KeyType.RANGE).build())
+            .projection(Projection.builder().projectionType(ProjectionType.KEYS_ONLY).build())
+            .build())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    // wait for the dynamo table to complete provisioning, which takes around 10 seconds
+    Tasks.foreach(awsProperties.dynamoDbTableName())
+        .retry(CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX)
+        .throwFailureWhenFinished()
+        .onlyRetryOn(IllegalStateException.class)
+        .run(this::checkTableActive);
+  }
+
+  private void checkTableActive(String tableName) {
+    try {
+      DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      TableStatus currentStatus = response.table().tableStatus();
+      if (!currentStatus.equals(TableStatus.ACTIVE)) {
+        throw new IllegalStateException(String.format("Dynamo catalog table %s is not active, current status: %s",
+            tableName, currentStatus));
+      }
+    } catch (ResourceNotFoundException e) {
+      throw new IllegalStateException(String.format("Cannot find Dynamo catalog table %s", tableName));
+    }
+  }
+
+  private boolean updateProperties(Namespace namespace, String updateExpression,
+                                   Map<String, AttributeValue> attributeValues,
+                                   Map<String, String> attributeNames) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> key = namespacePrimaryKey(namespace);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+      }
+
+      attributeValues.put(":v", response.item().get(COL_VERSION));
+      dynamo.updateItem(UpdateItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(key)
+          .conditionExpression(COL_VERSION + " = :v")
+          .updateExpression(updateExpression)
+          .expressionAttributeValues(attributeValues)
+          .expressionAttributeNames(attributeNames)
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java
new file mode 100644
index 0000000..81157e9
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+class DynamoDbTableOperations extends BaseMetastoreTableOperations {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbTableOperations.class);
+
+  private final DynamoDbClient dynamo;
+  private final AwsProperties awsProperties;
+  private final TableIdentifier tableIdentifier;
+  private final String fullTableName;
+  private final FileIO fileIO;
+
+  DynamoDbTableOperations(
+      DynamoDbClient dynamo,
+      AwsProperties awsProperties,
+      String catalogName,
+      FileIO fileIO,
+      TableIdentifier tableIdentifier) {
+    this.dynamo = dynamo;
+    this.awsProperties = awsProperties;
+    this.fullTableName = String.format("%s.%s", catalogName, tableIdentifier);
+    this.tableIdentifier = tableIdentifier;
+    this.fileIO = fileIO;
+  }
+
+  @Override
+  protected String tableName() {
+    return fullTableName;
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected void doRefresh() {
+    String metadataLocation = null;
+    GetItemResponse table = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier))
+        .build());
+    if (table.hasItem()) {
+      metadataLocation = getMetadataLocation(table);
+    } else {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchTableException("Cannot find table %s after refresh, " +
+            "maybe another process deleted it or revoked your access permission", tableName());
+      }
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+    CommitStatus commitStatus = CommitStatus.FAILURE;
+    Map<String, AttributeValue> tableKey = DynamoDbCatalog.tablePrimaryKey(tableIdentifier);
+    try {
+      GetItemResponse table = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(tableKey)
+          .build());
+      checkMetadataLocation(table, base);
+      Map<String, String> properties = prepareProperties(table, newMetadataLocation);
+      persistTable(tableKey, table, properties);
+      commitStatus = CommitStatus.SUCCESS;
+    } catch (ConditionalCheckFailedException e) {
+      throw new CommitFailedException(e, "Cannot commit %s: concurrent update detected", tableName());
+    } catch (RuntimeException persistFailure) {
+      LOG.error("Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.",
+          fullTableName, persistFailure);
+      commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+
+      switch (commitStatus) {
+        case SUCCESS:
+          break;
+        case FAILURE:
+          throw new CommitFailedException(persistFailure,
+              "Cannot commit %s due to unexpected exception", tableName());
+        case UNKNOWN:
+          throw new CommitStateUnknownException(persistFailure);
+      }
+    } finally {
+      try {
+        if (commitStatus == CommitStatus.FAILURE) {
+          // if anything went wrong, clean up the uncommitted metadata file
+          io().deleteFile(newMetadataLocation);
+        }
+      } catch (RuntimeException e) {
+        LOG.error("Fail to cleanup metadata file at {}", newMetadataLocation, e);
+        throw e;
+      }
+    }
+  }
+
+  private void checkMetadataLocation(GetItemResponse table, TableMetadata base) {
+    String dynamoMetadataLocation = table.hasItem() ? getMetadataLocation(table) : null;
+    String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
+    if (!Objects.equals(baseMetadataLocation, dynamoMetadataLocation)) {
+      throw new CommitFailedException(
+          "Cannot commit %s because base metadata location '%s' is not same as the current DynamoDb location '%s'",
+          tableName(), baseMetadataLocation, dynamoMetadataLocation);
+    }
+  }
+
+  private String getMetadataLocation(GetItemResponse table) {
+    return table.item().get(DynamoDbCatalog.toPropertyCol(METADATA_LOCATION_PROP)).s();
+  }
+
+  private Map<String, String> prepareProperties(GetItemResponse response, String newMetadataLocation) {
+    Map<String, String> properties = response.hasItem() ? getProperties(response) : Maps.newHashMap();
+    properties.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
+    properties.put(METADATA_LOCATION_PROP, newMetadataLocation);
+    if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) {
+      properties.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
+    }
+
+    return properties;
+  }
+
+  private Map<String, String> getProperties(GetItemResponse table) {
+    return table.item().entrySet().stream()
+        .filter(e -> DynamoDbCatalog.isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> DynamoDbCatalog.toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  void persistTable(Map<String, AttributeValue> tableKey, GetItemResponse table, Map<String, String> parameters) {
+    if (table.hasItem()) {
+      LOG.debug("Committing existing DynamoDb catalog table: {}", tableName());
+      List<String> updateParts = Lists.newArrayList();
+      Map<String, String> attributeNames = Maps.newHashMap();
+      Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+      int idx = 0;
+      for (Map.Entry<String, String> property : parameters.entrySet()) {
+        String attributeValue = ":v" + idx;
+        String attributeKey = "#k" + idx;
+        idx++;
+        updateParts.add(attributeKey + " = " + attributeValue);
+        attributeNames.put(attributeKey, DynamoDbCatalog.toPropertyCol(property.getKey()));
+        attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+      }
+      DynamoDbCatalog.updateCatalogEntryMetadata(updateParts, attributeValues);
+      String updateExpression = "SET " + DynamoDbCatalog.COMMA.join(updateParts);
+      attributeValues.put(":v", table.item().get(DynamoDbCatalog.COL_VERSION));
+      dynamo.updateItem(UpdateItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tableKey)
+          .conditionExpression(DynamoDbCatalog.COL_VERSION + " = :v")
+          .updateExpression(updateExpression)
+          .expressionAttributeValues(attributeValues)
+          .expressionAttributeNames(attributeNames)
+          .build());
+    } else {
+      LOG.debug("Committing new DynamoDb catalog table: {}", tableName());
+      Map<String, AttributeValue> values = Maps.newHashMap(tableKey);
+      parameters.forEach((k, v) -> values.put(DynamoDbCatalog.toPropertyCol(k),
+          AttributeValue.builder().s(v).build()));
+      DynamoDbCatalog.setNewCatalogEntryMetadata(values);
+
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .item(values)
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .build());
+    }
+  }
+}