You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/06/09 06:12:59 UTC

[GitHub] [iceberg] jackye1995 opened a new pull request #2688: AWS: add DynamoDb catalog

jackye1995 opened a new pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688


   Add DynamoDB catalog implementation, with the following specifications:
   1. identifier column (partition key): table identifier string, or `NAMESPACE` for namespaces
   2. namespace column (sort key): namespace string
   3. a global secondary index with namespace as partition key, identifier as sort key
   4. version column : UUID string, used for optimistic locking
   5. updated_at column : timestamp long, used to record latest update time
   6. created_at column : timestamp long, used to record initial create time
   7. p.[property_key] column : string, used to store properties (namespace property or Iceberg-defined table properties including `table_type`, `metadata_location` and `previous_metadata_location`)
   
   This design has the following benefits:
   1. table name is used directly as partition key to avoid any potential hot partition issue, comparing to use namespace as partition key and table name as sort key
   2. namespace operations are clustered in a single partition to avoid affecting table commit operations
   3. a reverse GSI is used for list table operation, and all other operations are single row ops or single partition query
   4. a string UUID version field is used instead of updated_at to avoid 2 processes committing at the same millisecond
   5. multi-row transaction is used for `renameTable` to ensure idempotency
   6. storage per row and update overhead is minimized by flattening properties with a `p.` prefix, instead of placing them in a single nested map type column.
   
   Limitations:
   1. To avoid complications in parsing namespace, dot (`.`) is not allowed in any level of namespace
   2. Similarly, to avoid complications in parsing table identifier, dot is not allowed in table name.
   
   @yyanyy @rdblue @SreeramGarlapati @johnclara @danielcweeks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654979357



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";

Review comment:
       ok, sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654836762



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";

Review comment:
       It's been a while since I've used DynamoDB, but it used to be that the console viewer would show all columns (I recognize that dynamodb itself only requires hash and sort keys).  Maybe that's changed and it's not an issue.
   
   For the use case that you mentioned though, there's nothing specific that iceberg supports with the secondary index so it's somewhat external to the catalog implementation.  I'm not fundamentally opposed, just trying to make sure we have good justification for taking this approach, which is somewhat less straightforward than keeping the properties as a map.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#issuecomment-864602481


   This looks good to me.  Just the two small updates (docs and empty line) and I'm happy to commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654958923



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()

Review comment:
       Yes it is only used to make sure the destination does not exist, because in the transaction block we cannot tell which step it fails at in the 2 step transaction. The early check can provide a clearer message of `AlreadyExistsException` instead of a generic conditional check failure, and it is much more likely that someone accidentally tries to rename to an already existing table, comparing to the case that 1 person tries to create the table and another person tries to rename a table to it at the same time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654838643



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()

Review comment:
       The `toResponse` is only used for the first existence check, but that's also done by the put conditional expression in the transaction.  We shouldn't need the first check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#issuecomment-865754476


   @danielcweeks thank you, updated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r649432820



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";

Review comment:
       > is there a reason we wouldn't just use a map in DynamoDB for these properties
   
   The biggest consideration was that in case any user would like to build additional custom index on any property, then it would not be possible on a nested type. One example is that people usually store owner information for tables and namespaces as properties and might build index to search on that.
   
   > might want to split out some of the well known values to top level properties
   
   Currently the approach has the simplicity that namespace and table are stored using the same principle, where namespace properties and the 3 table properties (metadata_location, previous_metadata_location, table_type) are stored in the same mechanism, which allows them to share some util methods. But I don't have strong opinion on this, if we think `location` is important I can move it out. But apart from that I don't see any other similar use cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654836917



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";

Review comment:
       Yeah, I missed how this column was being used because I didn't initially see where the value was being updated for changes to the entries.  This make sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654833636



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";

Review comment:
       Yes, `attribute_not_exists` is used when a row is initially created in the table. But subsequent commits to the table has to leverage this version field. See `DynamoDbTableOperations.persistTable` for more details.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654829416



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";

Review comment:
       I'm not actually sure this is necessary since it seems that DyanmoDB has options for checking for existence with `if_not_exists` and `attribute_not_exists`.  I'm not sure if there some edge case (I'm not terribly familiar with the conditionals), but it would make this a whole lot more readable if the conditions were more like (`attribute_not_exists(identifier)`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654835681



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()

Review comment:
       Wait, hold on I updated the wrong comment here.  We need to do a get for the row identifying the table to be renamed, but I don't see why we need a get for the target row?  There are two gets here.  The first is necessary, the second one shouldn't be from what I can tell.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654833389



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";

Review comment:
       > The main concern here is that table properties can get rather messy in that anyone can add values and there's really no limitation of name. This would in turn make the catalog table schema rather messy since it would be the union of all of the table property key names.
   
   DynamoDB is a noSQL database. Similar to HBase and Cassandra, the table schema only defines the primary key (in DynamoDB case is the hash key and range key, which are the `identifier` and `namespace` columns), and all the other columns are not a part of the schema. There is no such a concept that the table schema has to be a union of all column values.
   
   > I feel like since we don't have any use cases specifically called out for secondary indexes, we should use a map to reflect the table properties.
   
   I have listed one use case that we encounter a customer request about regarding ownership. The goal is to avoid the need to keep iterating the implementation with those feature requests.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654833759



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(toKey)
+        .build());
+
+    if (toResponse.hasItem()) {
+      throw new AlreadyExistsException("Cannot rename table %s to %s: %s already exists", from, to, to);
+    }
+
+    fromResponse.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .forEach(e -> toKey.put(e.getKey(), e.getValue()));
+
+    setNewCatalogEntryMetadata(toKey);
+
+    dynamo.transactWriteItems(TransactWriteItemsRequest.builder()
+        .transactItems(
+            TransactWriteItem.builder()
+                .delete(Delete.builder()
+                    .tableName(awsProperties.dynamoDbTableName())
+                    .key(fromKey)
+                    .conditionExpression(COL_VERSION + " = :v")
+                    .expressionAttributeValues(ImmutableMap.of(":v", fromResponse.item().get(COL_VERSION)))
+                    .build())
+                .build(),
+            TransactWriteItem.builder()
+                .put(Put.builder()
+                    .tableName(awsProperties.dynamoDbTableName())
+                    .item(toKey)
+                    .conditionExpression("attribute_not_exists(" + COL_VERSION + ")")
+                    .build())
+                .build())
+            .build());
+
+    LOG.info("Successfully renamed table from {} to {}", from, to);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    hadoopConf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return hadoopConf;
+  }
+
+  @Override
+  public void close() throws IOException {
+    dynamo.close();
+  }
+
+  /**
+   * The property used to set a default location for tables in a namespace.
+   * Call {@link #setProperties(Namespace, Map)} to set a path value using this property for a namespace,
+   * then all tables in the namespace will have default table root path under that given path.
+   * @return default location property key
+   */
+  public static String defaultLocationProperty() {
+    return PROPERTY_DEFAULT_LOCATION;
+  }
+
+  static String toPropertyCol(String propertyKey) {
+    return PROPERTY_COL_PREFIX + propertyKey;
+  }
+
+  static boolean isProperty(String dynamoCol) {
+    return dynamoCol.startsWith(PROPERTY_COL_PREFIX);
+  }
+
+  static String toPropertyKey(String propertyCol) {
+    return propertyCol.substring(PROPERTY_COL_PREFIX.length());
+  }
+
+  static Map<String, AttributeValue> namespacePrimaryKey(Namespace namespace) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_IDENTIFIER, AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    key.put(COL_NAMESPACE, AttributeValue.builder().s(namespace.toString()).build());
+    return key;
+  }
+
+  static Map<String, AttributeValue> tablePrimaryKey(TableIdentifier identifier) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_IDENTIFIER, AttributeValue.builder().s(identifier.toString()).build());
+    key.put(COL_NAMESPACE, AttributeValue.builder().s(identifier.namespace().toString()).build());
+    return key;
+  }
+
+  static void setNewCatalogEntryMetadata(Map<String, AttributeValue> values) {
+    String current = Long.toString(System.currentTimeMillis());
+    values.put(COL_CREATED_AT, AttributeValue.builder().n(current).build());
+    values.put(COL_UPDATED_AT, AttributeValue.builder().n(current).build());
+    values.put(COL_VERSION, AttributeValue.builder().s(UUID.randomUUID().toString()).build());
+  }
+
+  static void updateCatalogEntryMetadata(List<String> updateParts, Map<String, AttributeValue> attributeValues) {
+    updateParts.add(COL_UPDATED_AT + " = :uat");
+    attributeValues.put(":uat", AttributeValue.builder().n(Long.toString(System.currentTimeMillis())).build());
+    updateParts.add(COL_VERSION + " = :uv");
+    attributeValues.put(":uv", AttributeValue.builder().s(UUID.randomUUID().toString()).build());
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new S3FileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  private String cleanWarehousePath(String path) {
+    Preconditions.checkArgument(path != null && path.length() > 0,
+        "Cannot initialize DynamoDbCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.charAt(len - 1) == '/') {
+      return path.substring(0, len - 1);
+    } else {
+      return path;
+    }
+  }
+
+  private void validateNamespace(Namespace namespace) {
+    for (String level : namespace.levels()) {
+      ValidationException.check(level != null && !level.isEmpty(),
+          "Namespace level must not be empty: %s", namespace);
+      ValidationException.check(!level.contains("."),
+          "Namespace level must not contain dot, but found %s in %s", level, namespace);
+    }
+  }
+
+  private void validateTableIdentifier(TableIdentifier identifier) {
+    validateNamespace(identifier.namespace());
+    ValidationException.check(identifier.hasNamespace(),
+        "Table namespace must not be empty: %s", identifier);
+    String tableName = identifier.name();
+    ValidationException.check(!tableName.contains("."),
+        "Table name must not contain dot: %s", tableName);
+  }
+
+  private boolean dynamoDbTableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void ensureCatalogTableExistsOrCreate() {
+

Review comment:
       nit: unnecessary empty line 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654980300



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()

Review comment:
       Hmm, it's unfortunate that the transactions don't have a better mechanism for handling constraint violations (or custom errors messages) for failures.  Since this isn't a high use/frequency operation, I agree the extra check is helpful for users.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654960624



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(toKey)
+        .build());
+
+    if (toResponse.hasItem()) {
+      throw new AlreadyExistsException("Cannot rename table %s to %s: %s already exists", from, to, to);
+    }
+
+    fromResponse.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .forEach(e -> toKey.put(e.getKey(), e.getValue()));
+
+    setNewCatalogEntryMetadata(toKey);
+
+    dynamo.transactWriteItems(TransactWriteItemsRequest.builder()
+        .transactItems(
+            TransactWriteItem.builder()
+                .delete(Delete.builder()
+                    .tableName(awsProperties.dynamoDbTableName())
+                    .key(fromKey)
+                    .conditionExpression(COL_VERSION + " = :v")
+                    .expressionAttributeValues(ImmutableMap.of(":v", fromResponse.item().get(COL_VERSION)))
+                    .build())
+                .build(),
+            TransactWriteItem.builder()
+                .put(Put.builder()
+                    .tableName(awsProperties.dynamoDbTableName())
+                    .item(toKey)
+                    .conditionExpression("attribute_not_exists(" + COL_VERSION + ")")
+                    .build())
+                .build())
+            .build());
+
+    LOG.info("Successfully renamed table from {} to {}", from, to);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    hadoopConf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return hadoopConf;
+  }
+
+  @Override
+  public void close() throws IOException {
+    dynamo.close();
+  }
+
+  /**
+   * The property used to set a default location for tables in a namespace.
+   * Call {@link #setProperties(Namespace, Map)} to set a path value using this property for a namespace,
+   * then all tables in the namespace will have default table root path under that given path.
+   * @return default location property key
+   */
+  public static String defaultLocationProperty() {
+    return PROPERTY_DEFAULT_LOCATION;
+  }
+
+  static String toPropertyCol(String propertyKey) {
+    return PROPERTY_COL_PREFIX + propertyKey;
+  }
+
+  static boolean isProperty(String dynamoCol) {
+    return dynamoCol.startsWith(PROPERTY_COL_PREFIX);
+  }
+
+  static String toPropertyKey(String propertyCol) {
+    return propertyCol.substring(PROPERTY_COL_PREFIX.length());
+  }
+
+  static Map<String, AttributeValue> namespacePrimaryKey(Namespace namespace) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_IDENTIFIER, AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    key.put(COL_NAMESPACE, AttributeValue.builder().s(namespace.toString()).build());
+    return key;
+  }
+
+  static Map<String, AttributeValue> tablePrimaryKey(TableIdentifier identifier) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_IDENTIFIER, AttributeValue.builder().s(identifier.toString()).build());
+    key.put(COL_NAMESPACE, AttributeValue.builder().s(identifier.namespace().toString()).build());
+    return key;
+  }
+
+  static void setNewCatalogEntryMetadata(Map<String, AttributeValue> values) {
+    String current = Long.toString(System.currentTimeMillis());
+    values.put(COL_CREATED_AT, AttributeValue.builder().n(current).build());
+    values.put(COL_UPDATED_AT, AttributeValue.builder().n(current).build());
+    values.put(COL_VERSION, AttributeValue.builder().s(UUID.randomUUID().toString()).build());
+  }
+
+  static void updateCatalogEntryMetadata(List<String> updateParts, Map<String, AttributeValue> attributeValues) {
+    updateParts.add(COL_UPDATED_AT + " = :uat");
+    attributeValues.put(":uat", AttributeValue.builder().n(Long.toString(System.currentTimeMillis())).build());
+    updateParts.add(COL_VERSION + " = :uv");
+    attributeValues.put(":uv", AttributeValue.builder().s(UUID.randomUUID().toString()).build());
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new S3FileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  private String cleanWarehousePath(String path) {
+    Preconditions.checkArgument(path != null && path.length() > 0,
+        "Cannot initialize DynamoDbCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.charAt(len - 1) == '/') {
+      return path.substring(0, len - 1);
+    } else {
+      return path;
+    }
+  }
+
+  private void validateNamespace(Namespace namespace) {
+    for (String level : namespace.levels()) {
+      ValidationException.check(level != null && !level.isEmpty(),
+          "Namespace level must not be empty: %s", namespace);
+      ValidationException.check(!level.contains("."),
+          "Namespace level must not contain dot, but found %s in %s", level, namespace);
+    }
+  }
+
+  private void validateTableIdentifier(TableIdentifier identifier) {
+    validateNamespace(identifier.namespace());
+    ValidationException.check(identifier.hasNamespace(),
+        "Table namespace must not be empty: %s", identifier);
+    String tableName = identifier.name();
+    ValidationException.check(!tableName.contains("."),
+        "Table name must not contain dot: %s", tableName);
+  }
+
+  private boolean dynamoDbTableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void ensureCatalogTableExistsOrCreate() {
+
+    if (dynamoDbTableExists(awsProperties.dynamoDbTableName())) {
+      return;
+    }
+
+    LOG.info("DynamoDb catalog table {} not found, trying to create", awsProperties.dynamoDbTableName());
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .keySchema(
+            KeySchemaElement.builder().attributeName(COL_IDENTIFIER).keyType(KeyType.HASH).build(),
+            KeySchemaElement.builder().attributeName(COL_NAMESPACE).keyType(KeyType.RANGE).build())
+        .attributeDefinitions(
+            AttributeDefinition.builder().attributeName(COL_IDENTIFIER).attributeType(ScalarAttributeType.S).build(),
+            AttributeDefinition.builder().attributeName(COL_NAMESPACE).attributeType(ScalarAttributeType.S).build())
+        .globalSecondaryIndexes(GlobalSecondaryIndex.builder()
+            .indexName(GSI_NAMESPACE_IDENTIFIER)
+            .keySchema(
+                KeySchemaElement.builder().attributeName(COL_NAMESPACE).keyType(KeyType.HASH).build(),
+                KeySchemaElement.builder().attributeName(COL_IDENTIFIER).keyType(KeyType.RANGE).build())
+            .projection(Projection.builder().projectionType(ProjectionType.KEYS_ONLY).build())
+            .build())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(awsProperties.dynamoDbTableName())

Review comment:
       It usually takes just a few seconds. I will add some comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654834181



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(toKey)
+        .build());
+
+    if (toResponse.hasItem()) {
+      throw new AlreadyExistsException("Cannot rename table %s to %s: %s already exists", from, to, to);
+    }
+
+    fromResponse.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .forEach(e -> toKey.put(e.getKey(), e.getValue()));
+
+    setNewCatalogEntryMetadata(toKey);
+
+    dynamo.transactWriteItems(TransactWriteItemsRequest.builder()
+        .transactItems(
+            TransactWriteItem.builder()
+                .delete(Delete.builder()
+                    .tableName(awsProperties.dynamoDbTableName())
+                    .key(fromKey)
+                    .conditionExpression(COL_VERSION + " = :v")
+                    .expressionAttributeValues(ImmutableMap.of(":v", fromResponse.item().get(COL_VERSION)))
+                    .build())
+                .build(),
+            TransactWriteItem.builder()
+                .put(Put.builder()
+                    .tableName(awsProperties.dynamoDbTableName())
+                    .item(toKey)
+                    .conditionExpression("attribute_not_exists(" + COL_VERSION + ")")
+                    .build())
+                .build())
+            .build());
+
+    LOG.info("Successfully renamed table from {} to {}", from, to);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    hadoopConf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return hadoopConf;
+  }
+
+  @Override
+  public void close() throws IOException {
+    dynamo.close();
+  }
+
+  /**
+   * The property used to set a default location for tables in a namespace.
+   * Call {@link #setProperties(Namespace, Map)} to set a path value using this property for a namespace,
+   * then all tables in the namespace will have default table root path under that given path.
+   * @return default location property key
+   */
+  public static String defaultLocationProperty() {
+    return PROPERTY_DEFAULT_LOCATION;
+  }
+
+  static String toPropertyCol(String propertyKey) {
+    return PROPERTY_COL_PREFIX + propertyKey;
+  }
+
+  static boolean isProperty(String dynamoCol) {
+    return dynamoCol.startsWith(PROPERTY_COL_PREFIX);
+  }
+
+  static String toPropertyKey(String propertyCol) {
+    return propertyCol.substring(PROPERTY_COL_PREFIX.length());
+  }
+
+  static Map<String, AttributeValue> namespacePrimaryKey(Namespace namespace) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_IDENTIFIER, AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    key.put(COL_NAMESPACE, AttributeValue.builder().s(namespace.toString()).build());
+    return key;
+  }
+
+  static Map<String, AttributeValue> tablePrimaryKey(TableIdentifier identifier) {
+    Map<String, AttributeValue> key = Maps.newHashMap();
+    key.put(COL_IDENTIFIER, AttributeValue.builder().s(identifier.toString()).build());
+    key.put(COL_NAMESPACE, AttributeValue.builder().s(identifier.namespace().toString()).build());
+    return key;
+  }
+
+  static void setNewCatalogEntryMetadata(Map<String, AttributeValue> values) {
+    String current = Long.toString(System.currentTimeMillis());
+    values.put(COL_CREATED_AT, AttributeValue.builder().n(current).build());
+    values.put(COL_UPDATED_AT, AttributeValue.builder().n(current).build());
+    values.put(COL_VERSION, AttributeValue.builder().s(UUID.randomUUID().toString()).build());
+  }
+
+  static void updateCatalogEntryMetadata(List<String> updateParts, Map<String, AttributeValue> attributeValues) {
+    updateParts.add(COL_UPDATED_AT + " = :uat");
+    attributeValues.put(":uat", AttributeValue.builder().n(Long.toString(System.currentTimeMillis())).build());
+    updateParts.add(COL_VERSION + " = :uv");
+    attributeValues.put(":uv", AttributeValue.builder().s(UUID.randomUUID().toString()).build());
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new S3FileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  private String cleanWarehousePath(String path) {
+    Preconditions.checkArgument(path != null && path.length() > 0,
+        "Cannot initialize DynamoDbCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.charAt(len - 1) == '/') {
+      return path.substring(0, len - 1);
+    } else {
+      return path;
+    }
+  }
+
+  private void validateNamespace(Namespace namespace) {
+    for (String level : namespace.levels()) {
+      ValidationException.check(level != null && !level.isEmpty(),
+          "Namespace level must not be empty: %s", namespace);
+      ValidationException.check(!level.contains("."),
+          "Namespace level must not contain dot, but found %s in %s", level, namespace);
+    }
+  }
+
+  private void validateTableIdentifier(TableIdentifier identifier) {
+    validateNamespace(identifier.namespace());
+    ValidationException.check(identifier.hasNamespace(),
+        "Table namespace must not be empty: %s", identifier);
+    String tableName = identifier.name();
+    ValidationException.check(!tableName.contains("."),
+        "Table name must not contain dot: %s", tableName);
+  }
+
+  private boolean dynamoDbTableExists(String tableName) {
+    try {
+      dynamo.describeTable(DescribeTableRequest.builder()
+          .tableName(tableName)
+          .build());
+      return true;
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+  }
+
+  private void ensureCatalogTableExistsOrCreate() {
+
+    if (dynamoDbTableExists(awsProperties.dynamoDbTableName())) {
+      return;
+    }
+
+    LOG.info("DynamoDb catalog table {} not found, trying to create", awsProperties.dynamoDbTableName());
+    dynamo.createTable(CreateTableRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .keySchema(
+            KeySchemaElement.builder().attributeName(COL_IDENTIFIER).keyType(KeyType.HASH).build(),
+            KeySchemaElement.builder().attributeName(COL_NAMESPACE).keyType(KeyType.RANGE).build())
+        .attributeDefinitions(
+            AttributeDefinition.builder().attributeName(COL_IDENTIFIER).attributeType(ScalarAttributeType.S).build(),
+            AttributeDefinition.builder().attributeName(COL_NAMESPACE).attributeType(ScalarAttributeType.S).build())
+        .globalSecondaryIndexes(GlobalSecondaryIndex.builder()
+            .indexName(GSI_NAMESPACE_IDENTIFIER)
+            .keySchema(
+                KeySchemaElement.builder().attributeName(COL_NAMESPACE).keyType(KeyType.HASH).build(),
+                KeySchemaElement.builder().attributeName(COL_IDENTIFIER).keyType(KeyType.RANGE).build())
+            .projection(Projection.builder().projectionType(ProjectionType.KEYS_ONLY).build())
+            .build())
+        .billingMode(BillingMode.PAY_PER_REQUEST)
+        .build());
+
+    Tasks.foreach(awsProperties.dynamoDbTableName())

Review comment:
       I assume what we're doing here is waiting for dynamodb to complete provisioning of the table (not sure how long that typically takes).  It might be good to add a comment to explain why this check is necessary for future reference.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654828829



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()

Review comment:
       This seems like an unnecessary check if it's also being handled by the transaction. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654833719



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()

Review comment:
       Similar to your comment of the version field, a get is necessary to get the last version and perform optimistic locking.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks merged pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks merged pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654838643



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()

Review comment:
       The `toResponse` is only used for the first existence check, but that's also done by the put conditional expression in the transaction.  We shouldn't need that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654828636



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";

Review comment:
       The main concern here is that table properties can get rather messy in that anyone can add values and there's really no limitation of name.  This would in turn make the catalog table schema rather messy since it would be the union of all of the table property key names.  
   
   I feel like since we don't have any use cases specifically called out for secondary indexes, we should use a map to reflect the table properties.  If we identify cases where we want additional secondary indexes, we could come up with a way to "promote" certain table properties, but I don't want to speculate too much until we understand what those cases really are.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#issuecomment-863557341


   @danielcweeks any additional comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r648502400



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";

Review comment:
       @jackye1995 is there a reason we wouldn't just use a map in DynamoDB for these properties (as opposed to creating a prefixed column)?
   
   Seems like might want to split out some of the well known values to top level properties (like location), but general properties just stored as a map.  Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654829416



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";

Review comment:
       I'm not actually sure this is necessary since it seems that DyanmoDB has options for checking for existence with `if_not_exists` and `attribute_not_exists`.  I'm not sure if there some edge case (I'm not terribly familiar with the conditionals), but it would make this a whole lot more readable if the conditions were more like (`attribute_not_exists(identifier)'.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654960523



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";

Review comment:
       > It's been a while since I've used DynamoDB, but it used to be that the console viewer would show all columns
   
   I see, yes in the console you still see all the available columns (it is based on the rows it scans in the first batch). The number of columns shown will be propotional to the number of properties that the user choose to add to the tables.
   
   > there's nothing specific that iceberg supports with the secondary index so it's somewhat external to the catalog implementation. I'm not fundamentally opposed, just trying to make sure we have good justification for taking this approach
   
   Yes totally agree, I think this is actually the most important reason we chose this approach, because it provides much more flexibility for users to extend based on this single solution, without the need for us to add any additional DynamoDB specific logic such as creating new GSIs in the future in open source. That will be much harder to do because it needs to add auto-migration logic to existing users which would potentially impact production performance. The functional benefit overweights the cost that the UI shows a few more columns in my perspective.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654828829



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()

Review comment:
       ~This seems like an unnecessary check if it's also being handled by the transaction~.  I missed where we're setting this to a UUID.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #2688: AWS: add DynamoDb catalog

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #2688:
URL: https://github.com/apache/iceberg/pull/2688#discussion_r654835681



##########
File path: aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
##########
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.dynamodb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientFactories;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+
+/**
+ * DynamoDB implementation of Iceberg catalog
+ */
+public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
+  private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
+  static final Joiner COMMA = Joiner.on(',');
+
+  private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier";
+  private static final String COL_IDENTIFIER = "identifier";
+  private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE";
+  private static final String COL_NAMESPACE = "namespace";
+  private static final String PROPERTY_COL_PREFIX = "p.";
+  private static final String PROPERTY_DEFAULT_LOCATION = "default_location";
+  private static final String COL_CREATED_AT = "created_at";
+  private static final String COL_UPDATED_AT = "updated_at";
+
+  // field used for optimistic locking
+  static final String COL_VERSION = "v";
+
+  private DynamoDbClient dynamo;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  public DynamoDbCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        AwsClientFactories.from(properties).dynamo(),
+        initializeFileIO(properties));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.dynamo = client;
+    this.fileIO = io;
+    ensureCatalogTableExistsOrCreate();
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    validateTableIdentifier(tableIdentifier);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(tableIdentifier.namespace()))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist",
+          tableIdentifier.namespace());
+    }
+
+    String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
+    if (response.item().containsKey(defaultLocationCol)) {
+      return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name());
+    } else {
+      return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    validateNamespace(namespace);
+    Map<String, AttributeValue> values = namespacePrimaryKey(namespace);
+    setNewCatalogEntryMetadata(values);
+    metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build()));
+
+    try {
+      dynamo.putItem(PutItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")")
+          .item(values)
+          .build());
+    } catch (ConditionalCheckFailedException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    List<Namespace> namespaces = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey = null;
+    String condition = COL_IDENTIFIER + " = :identifier";
+    Map<String, AttributeValue> conditionValues = Maps.newHashMap();
+    conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build());
+    if (!namespace.isEmpty()) {
+      condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)";
+      conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build());
+    }
+
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .exclusiveStartKey(lastEvaluatedKey)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String ns = item.get(COL_NAMESPACE).s();
+          namespaces.add(Namespace.of(ns.split("\\.")));
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+
+    return namespaces;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    validateNamespace(namespace);
+    GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(namespacePrimaryKey(namespace))
+        .build());
+
+    if (!response.hasItem()) {
+      throw new NoSuchNamespaceException("Cannot find namespace %s", namespace);
+    }
+
+    return response.item().entrySet().stream()
+        .filter(e -> isProperty(e.getKey()))
+        .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s()));
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    validateNamespace(namespace);
+    if (!listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace);
+    }
+
+    try {
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(namespacePrimaryKey(namespace))
+          .conditionExpression("attribute_exists(" + namespace + ")")
+          .build());
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    List<String> updateParts = Lists.newArrayList();
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      String attributeValue = ":v" + idx;
+      String attributeKey = "#k" + idx;
+      idx++;
+      updateParts.add(attributeKey + " = " + attributeValue);
+      attributeNames.put(attributeKey, toPropertyCol(property.getKey()));
+      attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build());
+    }
+
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    List<String> removeParts = Lists.newArrayList(properties.iterator());
+    Map<String, String> attributeNames = Maps.newHashMap();
+    Map<String, AttributeValue> attributeValues = Maps.newHashMap();
+    int idx = 0;
+    for (String property : properties) {
+      String attributeKey = "#k" + idx;
+      idx++;
+      removeParts.add(attributeKey);
+      attributeNames.put(attributeKey, toPropertyCol(property));
+    }
+
+    List<String> updateParts = Lists.newArrayList();
+    updateCatalogEntryMetadata(updateParts, attributeValues);
+    String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts);
+    return updateProperties(namespace, updateExpression, attributeValues, attributeNames);
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    List<TableIdentifier> identifiers = Lists.newArrayList();
+    Map<String, AttributeValue> lastEvaluatedKey;
+    String condition = COL_NAMESPACE + " = :ns";
+    Map<String, AttributeValue> conditionValues = ImmutableMap.of(
+        ":ns", AttributeValue.builder().s(namespace.toString()).build());
+    do {
+      QueryResponse response = dynamo.query(QueryRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .indexName(GSI_NAMESPACE_IDENTIFIER)
+          .keyConditionExpression(condition)
+          .expressionAttributeValues(conditionValues)
+          .build());
+
+      if (response.hasItems()) {
+        for (Map<String, AttributeValue> item : response.items()) {
+          String identifier = item.get(COL_IDENTIFIER).s();
+          if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) {
+            identifiers.add(TableIdentifier.of(identifier.split("\\.")));
+          }
+        }
+      }
+
+      lastEvaluatedKey = response.lastEvaluatedKey();
+    } while (!lastEvaluatedKey.isEmpty());
+    return identifiers;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    Map<String, AttributeValue> key = tablePrimaryKey(identifier);
+    try {
+      GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .consistentRead(true)
+          .key(key)
+          .build());
+
+      if (!response.hasItem()) {
+        throw new NoSuchTableException("Cannot find table %s to drop", identifier);
+      }
+
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      dynamo.deleteItem(DeleteItemRequest.builder()
+          .tableName(awsProperties.dynamoDbTableName())
+          .key(tablePrimaryKey(identifier))
+          .conditionExpression(COL_VERSION + " = :v")
+          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+          .build());
+      LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
+
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Table {} data purged", identifier);
+      }
+
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (ConditionalCheckFailedException e) {
+      LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Map<String, AttributeValue> fromKey = tablePrimaryKey(from);
+    Map<String, AttributeValue> toKey = tablePrimaryKey(to);
+
+    GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder()
+        .tableName(awsProperties.dynamoDbTableName())
+        .consistentRead(true)
+        .key(fromKey)
+        .build());
+
+    if (!fromResponse.hasItem()) {
+      throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from);
+    }
+
+    GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder()

Review comment:
       Wait, hold on I updated the wrong comment here.  We need to do a get for the row identifying the table to be renamed, but I don't see why we need a get for the target row?  There two gets here.  The first is necessary, the second one shouldn't be from what I can tell.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org