You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by fa...@apache.org on 2016/12/09 23:30:41 UTC

[2/2] hadoop git commit: HADOOP-13449 S3Guard: Implement DynamoDBMetadataStore. Contributed by Mingliang Liu

HADOOP-13449 S3Guard: Implement DynamoDBMetadataStore.  Contributed by Mingliang Liu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d354cd18
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d354cd18
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d354cd18

Branch: refs/heads/HADOOP-13345
Commit: d354cd182f9ff87660cbb61bb2a2448c8f30fd04
Parents: 881de1f
Author: Aaron Fabbri <fa...@apache.org>
Authored: Thu Dec 8 19:26:06 2016 -0800
Committer: Aaron Fabbri <fa...@apache.org>
Committed: Thu Dec 8 19:26:06 2016 -0800

----------------------------------------------------------------------
 .../src/main/resources/core-default.xml         |  42 ++
 hadoop-project/pom.xml                          |  18 +
 hadoop-tools/hadoop-aws/pom.xml                 |  32 ++
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java   |  16 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |   3 +-
 .../apache/hadoop/fs/s3a/S3ClientFactory.java   |   4 +-
 .../fs/s3a/s3guard/DescendantsIterator.java     | 137 +++++
 .../fs/s3a/s3guard/DirListingMetadata.java      |   6 +-
 .../fs/s3a/s3guard/DynamoDBClientFactory.java   | 101 ++++
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java   | 545 +++++++++++++++++++
 .../fs/s3a/s3guard/LocalMetadataStore.java      |   7 +
 .../hadoop/fs/s3a/s3guard/MetadataStore.java    |  13 +
 .../fs/s3a/s3guard/NullMetadataStore.java       |   5 +
 .../PathMetadataDynamoDBTranslation.java        | 209 +++++++
 .../apache/hadoop/fs/s3a/s3guard/S3Guard.java   |  54 ++
 .../hadoop/fs/s3a/AbstractS3AMockTest.java      |  10 +-
 .../hadoop/fs/s3a/ITestS3AEmptyDirectory.java   |  78 +++
 .../fs/s3a/s3guard/MetadataStoreTestBase.java   | 136 +++--
 .../s3a/s3guard/TestDynamoDBMetadataStore.java  | 325 +++++++++++
 .../fs/s3a/s3guard/TestLocalMetadataStore.java  |  24 +
 .../TestPathMetadataDynamoDBTranslation.java    | 219 ++++++++
 21 files changed, 1923 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 37b8f6d..7683e7d 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1241,6 +1241,48 @@
 </property>
 
 <property>
+  <name>fs.s3a.s3guard.ddb.endpoint</name>
+  <value></value>
+  <description>
+    AWS DynamoDB endpoint to connect to. An up-to-date list is
+    provided in the AWS Documentation: regions and endpoints. Without this
+    property, the AWS SDK will look up a regional endpoint automatically
+    according to the S3 region.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.table</name>
+  <value></value>
+  <description>
+    The DynamoDB table name to operate. Without this property, the respective
+    S3 bucket name will be used.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.table.capacity.read</name>
+  <value>500</value>
+  <description>
+    Provisioned throughput requirements for read operations in terms of capacity
+    units for the DynamoDB table.  This config value will only be used when
+    creating a new DynamoDB table, though later you can manually provision by
+    increasing or decreasing read capacity as needed for existing tables.
+    See DynamoDB documents for more information.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.table.capacity.write</name>
+  <value>100</value>
+  <description>
+    Provisioned throughput requirements for write operations in terms of
+    capacity units for the DynamoDB table.  Refer to related config
+    fs.s3a.s3guard.ddb.table.capacity.read before usage.
+  </description>
+</property>
+
+  <property>
   <name>fs.AbstractFileSystem.s3a.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3A</value>
   <description>The implementation class of the S3A AbstractFileSystem.</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 1ae60ed..d8cff61 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -756,6 +756,16 @@
       </dependency>
       <dependency>
         <groupId>com.amazonaws</groupId>
+        <artifactId>aws-java-sdk-dynamodb</artifactId>
+        <version>${aws-java-sdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.amazonaws</groupId>
+        <artifactId>DynamoDBLocal</artifactId>
+        <version>1.11.0</version>
+      </dependency>
+      <dependency>
+        <groupId>com.amazonaws</groupId>
         <artifactId>aws-java-sdk-sts</artifactId>
         <version>${aws-java-sdk.version}</version>
       </dependency>
@@ -1627,4 +1637,12 @@
       </build>
     </profile>
   </profiles>
+
+  <repositories>
+    <repository>
+      <id>dynamodblocal</id>
+      <name>AWS DynamoDB Local Release Repository</name>
+      <url>http://dynamodb-local.s3-website-us-west-2.amazonaws.com/release</url>
+    </repository>
+  </repositories>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 1407661..b7b945d 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -310,6 +310,18 @@
               <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile>
             </configuration>
           </execution>
+          <execution>
+            <id>copy</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <includeScope>test</includeScope>
+              <includeTypes>so,dll,dylib</includeTypes>
+              <outputDirectory>${project.build.directory}/native-libs</outputDirectory>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
     </plugins>
@@ -354,6 +366,26 @@
     </dependency>
     <dependency>
       <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-dynamodb</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>DynamoDBLocal</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.hamcrest</groupId>
+          <artifactId>hamcrest-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-http</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
       <artifactId>aws-java-sdk-sts</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index a43a746..c411fdd 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -50,12 +50,22 @@ public class DefaultS3ClientFactory extends Configured implements
     Configuration conf = getConf();
     AWSCredentialsProvider credentials =
         createAWSCredentialProviderSet(name, conf, uri);
-    ClientConfiguration awsConf = new ClientConfiguration();
+    final ClientConfiguration awsConf = createAwsConf(getConf());
+    AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
+    return createAmazonS3Client(s3, conf, credentials, awsConf);
+  }
+
+  /**
+   * Create a new {@link ClientConfiguration}.
+   * @param conf The Hadoop configuration
+   * @return new AWS client configuration
+   */
+  public static ClientConfiguration createAwsConf(Configuration conf) {
+    final ClientConfiguration awsConf = new ClientConfiguration();
     initConnectionSettings(conf, awsConf);
     initProxySupport(conf, awsConf);
     initUserAgent(conf, awsConf);
-    AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
-    return createAmazonS3Client(s3, conf, credentials, awsConf);
+    return awsConf;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index cb08e57..bcc841c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -362,8 +362,7 @@ public class S3AFileSystem extends FileSystem {
    * Returns the S3 client used by this filesystem.
    * @return AmazonS3Client
    */
-  @VisibleForTesting
-  AmazonS3 getAmazonS3Client() {
+  public AmazonS3 getAmazonS3Client() {
     return s3;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 5169840..387eb43 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -27,11 +27,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * Factory for creation of S3 client instances to be used by {@link S3Store}.
+ * Factory for creation of {@link AmazonS3} client instances.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-interface S3ClientFactory {
+public interface S3ClientFactory {
 
   /**
    * Creates a new {@link AmazonS3} client.  This method accepts the S3A file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
new file mode 100644
index 0000000..afd3266
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * {@code DescendantsIterator} is a {@link RemoteIterator} that implements
+ * pre-ordering breadth-first traversal (BFS) of a path and all of its
+ * descendants recursively.  After visiting each path, that path's direct
+ * children are discovered by calling {@link MetadataStore#listChildren(Path)}.
+ * Each iteration returns the next direct child, and if that child is a
+ * directory, also pushes it onto a queue to discover its children later.
+ *
+ * For example, assume the consistent store contains metadata representing this
+ * file system structure:
+ *
+ * <pre>
+ * {@code
+ * /dir1
+ * |-- dir2
+ * |   |-- file1
+ * |   `-- file2
+ * `-- dir3
+ *     |-- dir4
+ *     |   `-- file3
+ *     |-- dir5
+ *     |   `-- file4
+ *     `-- dir6
+ * }
+ * </pre>
+ *
+ * Consider this code sample:
+ * <pre>
+ * {@code
+ * final PathMetadata dir1 = get(new Path("/dir1"));
+ * for (DescendantsIterator descendants = new DescendantsIterator(dir1);
+ *     descendants.hasNext(); ) {
+ *   final FileStatus status = descendants.next().getFileStatus();
+ *   System.out.printf("%s %s%n", status.isDirectory() ? 'D' : 'F',
+ *       status.getPath());
+ * }
+ * }
+ * </pre>
+ *
+ * The output is:
+ * <pre>
+ * {@code
+ * D /dir1
+ * D /dir1/dir2
+ * D /dir1/dir3
+ * F /dir1/dir2/file1
+ * F /dir1/dir2/file2
+ * D /dir1/dir3/dir4
+ * D /dir1/dir3/dir5
+ * F /dir1/dir3/dir4/file3
+ * F /dir1/dir3/dir5/file4
+ * D /dir1/dir3/dir6
+ * }
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class DescendantsIterator implements RemoteIterator<PathMetadata> {
+
+  private final MetadataStore metadataStore;
+  private final Queue<PathMetadata> queue = new LinkedList<>();
+
+  /**
+   * Creates a new {@code DescendantsIterator}.
+   *
+   * @param ms the associated {@link MetadataStore}
+   * @param meta base path for descendants iteration, which will be the first
+   *     path returned during iteration (except root)
+   */
+  DescendantsIterator(MetadataStore ms, PathMetadata meta)
+      throws IOException {
+    Preconditions.checkNotNull(ms);
+    Preconditions.checkNotNull(meta);
+    this.metadataStore = ms;
+
+    final Path path = meta.getFileStatus().getPath();
+    if (path.isRoot()) {
+      final DirListingMetadata rootListing = ms.listChildren(path);
+      if (rootListing != null) {
+        queue.addAll(rootListing.getListing());
+      }
+    } else {
+      queue.add(meta);
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return !queue.isEmpty();
+  }
+
+  @Override
+  public PathMetadata next() throws IOException {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more descendants.");
+    }
+    final PathMetadata next;
+    next = queue.poll();
+    if (next.getFileStatus().isDirectory()) {
+      final Path path = next.getFileStatus().getPath();
+      queue.addAll(metadataStore.listChildren(path).getListing());
+    }
+    return next;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
index c25ad3a..4a9df55 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
@@ -200,13 +200,15 @@ public class DirListingMetadata {
       Preconditions.checkNotNull(childUri.getHost(), "Expected non-null URI " +
           "host");
       Preconditions.checkArgument(
-          childUri.getHost().equals(parentUri.getHost()));
+          childUri.getHost().equals(parentUri.getHost()),
+          "childUri '" + childUri + "' and parentUri '" + parentUri
+              + "' should have the same host");
       Preconditions.checkNotNull(childUri.getScheme());
     }
     Preconditions.checkArgument(!childPath.isRoot(),
         "childPath cannot be the root path");
     Preconditions.checkArgument(childPath.getParent().equals(path),
-        "childPath must be a child of path");
+        "childPath '" + childPath + "' must be a child of path '" + path + "'");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
new file mode 100644
index 0000000..a06197f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.s3.model.Region;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
+
+/**
+ * Interface to create a DynamoDB client.
+ *
+ * Implementation should be configured for setting and getting configuration.
+ */
+interface DynamoDBClientFactory extends Configurable {
+  Logger LOG = LoggerFactory.getLogger(DynamoDBClientFactory.class);
+
+  /**
+   * To create a DynamoDB client with the same region as the s3 bucket.
+   *
+   * @param fsUri FileSystem URI after any login details have been stripped
+   * @param s3Region the s3 region
+   * @return a new DynamoDB client
+   * @throws IOException if any IO error happens
+   */
+  AmazonDynamoDBClient createDynamoDBClient(URI fsUri, String s3Region)
+      throws IOException;
+
+  /**
+   * The default implementation for creating an AmazonDynamoDBClient.
+   */
+  class DefaultDynamoDBClientFactory extends Configured
+      implements DynamoDBClientFactory {
+    @Override
+    public AmazonDynamoDBClient createDynamoDBClient(URI fsUri, String s3Region)
+        throws IOException {
+      assert getConf() != null : "Should have been configured before usage";
+      Region region;
+      try {
+        region = Region.fromValue(s3Region);
+      } catch (IllegalArgumentException e) {
+        final String msg = "Region '" + s3Region +
+            "' is invalid; should use the same region as S3 bucket";
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg, e);
+      }
+      LOG.info("Creating DynamoDBClient for fsUri {} in region {}",
+          fsUri, region);
+
+      final Configuration conf = getConf();
+      final AWSCredentialsProvider credentials =
+          createAWSCredentialProviderSet(fsUri, conf, fsUri);
+      final ClientConfiguration awsConf =
+          DefaultS3ClientFactory.createAwsConf(conf);
+      AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(credentials, awsConf);
+
+      ddb.withRegion(region.toAWSRegion());
+      final String endPoint = conf.get(S3Guard.S3GUARD_DDB_ENDPOINT_KEY);
+      if (StringUtils.isNotEmpty(endPoint)) {
+        try {
+          ddb.withEndpoint(conf.get(S3Guard.S3GUARD_DDB_ENDPOINT_KEY));
+        } catch (IllegalArgumentException e) {
+          final String msg = "Incorrect DynamoDB endpoint: "  + endPoint;
+          LOG.error(msg, e);
+          throw new IllegalArgumentException(msg, e);
+        }
+      }
+      return ddb;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
new file mode 100644
index 0000000..07ee542
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.ItemCollection;
+import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
+import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
+import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+import com.amazonaws.services.s3.AmazonS3;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
+
+/**
+ * DynamoDBMetadataStore is a {@link MetadataStore} that persists
+ * file system metadata to DynamoDB.
+ *
+ * The current implementation uses a schema consisting of a single table.  The
+ * name of the table can be configured by config key
+ * {@link S3Guard#S3GUARD_DDB_TABLE_NAME_KEY}.
+ * By default, it matches the name of the S3 bucket.  Each item in the table
+ * represents a single directory or file.  Its path is split into separate table
+ * attributes:
+ * <ul>
+ * <li> parent (absolute path of the parent). </li>
+ * <li> child (path of that specific child, relative to parent). </li>
+ * <li> optional boolean attribute tracking whether the path is a directory.
+ *      Absence or a false value indicates the path is a file. </li>
+ * <li> optional long attribute revealing modification time of file.
+ *      This attribute is meaningful only to file items.</li>
+ * <li> optional long attribute revealing file length.
+ *      This attribute is meaningful only to file items.</li>
+ * <li> optional long attribute revealing block size of the file.
+ *      This attribute is meaningful only to file items.</li>
+ * </ul>
+ *
+ * The DynamoDB partition key is the parent, and the range key is the child.
+ *
+ * Root is a special case.  It has no parent, so it cannot be split into
+ * separate parent and child attributes.  To avoid confusion in the DynamoDB
+ * table, we simply do not persist root and instead treat it as a special case
+ * path that always exists.
+ *
+ * For example, assume the consistent store contains metadata representing this
+ * file system structure:
+ *
+ * <pre>
+ * /dir1
+ * |-- dir2
+ * |   |-- file1
+ * |   `-- file2
+ * `-- dir3
+ *     |-- dir4
+ *     |   `-- file3
+ *     |-- dir5
+ *     |   `-- file4
+ *     `-- dir6
+ * </pre>
+ *
+ * This is persisted to a single DynamoDB table as:
+ *
+ * <pre>
+ * ==================================================================
+ * | parent          | child | is_dir | mod_time | len |     ...    |
+ * ==================================================================
+ * | /               | dir1  | true   |          |     |            |
+ * | /dir1           | dir2  | true   |          |     |            |
+ * | /dir1           | dir3  | true   |          |     |            |
+ * | /dir1/dir2      | file1 |        |   100    | 111 |            |
+ * | /dir1/dir2      | file2 |        |   200    | 222 |            |
+ * | /dir1/dir3      | dir4  | true   |          |     |            |
+ * | /dir1/dir3      | dir5  | true   |          |     |            |
+ * | /dir1/dir3/dir4 | file3 |        |   300    | 333 |            |
+ * | /dir1/dir3/dir5 | file4 |        |   400    | 444 |            |
+ * | /dir1/dir3      | dir6  | true   |          |     |            |
+ * ==================================================================
+ * </pre>
+ *
+ * This choice of schema is efficient for read access patterns.
+ * {@link #get(Path)} can be served from a single item lookup.
+ * {@link #listChildren(Path)} can be served from a query against all rows
+ * matching the parent (the partition key) and the returned list is guaranteed
+ * to be sorted by child (the range key).  Tracking whether or not a path is a
+ * directory helps prevent unnecessary queries during traversal of an entire
+ * sub-tree.
+ *
+ * Some mutating operations, notably {@link #deleteSubtree(Path)} and
+ * {@link #move(Collection, Collection)}, are less efficient with this schema.
+ * They require mutating multiple items in the DynamoDB table.
+ *
+ * All DynamoDB access is performed within the same AWS region as the S3 bucket
+ * that hosts the S3A instance.  During initialization, it checks the location
+ * of the S3 bucket and creates a DynamoDB client connected to the same region.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DynamoDBMetadataStore implements MetadataStore {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DynamoDBMetadataStore.class);
+
+  private DynamoDB dynamoDB;
+  private String region;
+  private Table table;
+  private String tableName;
+  private S3AFileSystem s3afs;
+  private String username;
+
+  @Override
+  public void initialize(FileSystem fs) throws IOException {
+    Preconditions.checkArgument(fs instanceof S3AFileSystem,
+        "DynamoDBMetadataStore only supports S3A filesystem.");
+    s3afs = (S3AFileSystem) fs;
+    final String bucket = s3afs.getUri().getAuthority();
+    try {
+      region = s3afs.getAmazonS3Client().getBucketLocation(bucket);
+    } catch (AmazonClientException e) {
+      throw new IOException("Can not find location for bucket " + bucket, e);
+    }
+
+    username = s3afs.getUsername();
+
+    final Configuration conf = s3afs.getConf();
+    Class<? extends DynamoDBClientFactory> cls = conf.getClass(
+        S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL,
+        S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
+        DynamoDBClientFactory.class);
+    AmazonDynamoDBClient dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
+        .createDynamoDBClient(s3afs.getUri(), region);
+    dynamoDB = new DynamoDB(dynamoDBClient);
+
+    // use the bucket as the DynamoDB table name if not specified in config
+    tableName = conf.getTrimmed(S3Guard.S3GUARD_DDB_TABLE_NAME_KEY, bucket);
+
+    // create the table unless it's explicitly told not to do so
+    if (conf.getBoolean(S3Guard.S3GUARD_DDB_TABLE_CREATE_KEY, true)) {
+      createTable();
+    }
+  }
+
+  /**
+   * Performs one-time initialization of the metadata store via configuration.
+   *
+   * This initialization depends on the configuration object to get DEFAULT
+   * S3AFileSystem URI, AWS credentials, S3ClientFactory implementation class,
+   * DynamoDBFactor implementation class, DynamoDB endpoints, metadata table
+   * names etc. Generally you should use {@link #initialize(FileSystem)} instead
+   * given an initialized S3 file system.
+   *
+   * @see #initialize(FileSystem)
+   * @throws IOException if there is an error
+   */
+  void initialize(Configuration conf) throws IOException {
+    final FileSystem defautFs = FileSystem.get(conf);
+    Preconditions.checkArgument(defautFs instanceof S3AFileSystem,
+        "DynamoDBMetadataStore only supports S3A filesystem.");
+    s3afs = (S3AFileSystem) defautFs;
+
+    // use the bucket as the DynamoDB table name if not specified in config
+    tableName = conf.getTrimmed(S3Guard.S3GUARD_DDB_TABLE_NAME_KEY);
+    Preconditions.checkNotNull(tableName, "No DynamoDB table name configured!");
+
+    final Class<? extends S3ClientFactory> clsS3 = conf.getClass(
+        Constants.S3_CLIENT_FACTORY_IMPL,
+        Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL,
+        S3ClientFactory.class);
+    final S3ClientFactory factory = ReflectionUtils.newInstance(clsS3, conf);
+    AmazonS3 s3 = factory.createS3Client(s3afs.getUri(), s3afs.getUri());
+    try {
+      region = s3.getBucketLocation(tableName);
+    } catch (AmazonClientException e) {
+      throw new IOException("Can not find location for bucket " + tableName, e);
+    }
+
+    Class<? extends DynamoDBClientFactory> clsDdb = conf.getClass(
+        S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL,
+        S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
+        DynamoDBClientFactory.class);
+    AmazonDynamoDBClient dynamoDBClient =
+        ReflectionUtils.newInstance(clsDdb, conf)
+            .createDynamoDBClient(s3afs.getUri(), region);
+    dynamoDB = new DynamoDB(dynamoDBClient);
+
+    createTable();
+  }
+
+  @Override
+  public void delete(Path path) throws IOException {
+    path = checkPath(path);
+    LOG.debug("Deleting from table {} in region {}: {}",
+        tableName, region, path);
+
+    // deleting nonexistent item consumes 1 write capacity; skip it
+    if (path.isRoot()) {
+      LOG.debug("Skip deleting root directory as it does not exist in table");
+      return;
+    }
+
+    try {
+      table.deleteItem(pathToKey(path));
+    } catch (AmazonClientException e) {
+      throw translateException("delete", path, e);
+    }
+  }
+
+  @Override
+  public void deleteSubtree(Path path) throws IOException {
+    path = checkPath(path);
+    LOG.debug("Deleting subtree from table {} in region {}: {}",
+        tableName, region, path);
+
+    final PathMetadata meta = get(path);
+    if (meta == null) {
+      LOG.debug("Subtree path {} does not exist; this will be a no-op", path);
+      return;
+    }
+
+    for (DescendantsIterator desc = new DescendantsIterator(this, meta);
+         desc.hasNext();) {
+      delete(desc.next().getFileStatus().getPath());
+    }
+  }
+
+  @Override
+  public PathMetadata get(Path path) throws IOException {
+    path = checkPath(path);
+    LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
+
+    try {
+      final PathMetadata meta;
+      if (path.isRoot()) {
+        // Root does not persist in the table
+        meta = new PathMetadata(new S3AFileStatus(true, path, username));
+      } else {
+        final GetItemSpec spec = new GetItemSpec()
+            .withPrimaryKey(pathToKey(path))
+            .withConsistentRead(true); // strictly consistent read
+        final Item item = table.getItem(spec);
+        meta = itemToPathMetadata(s3afs.getUri(), item, username);
+        LOG.debug("Get from table {} in region {} returning for {}: {}",
+            tableName, region, path, meta);
+      }
+
+      if (meta != null) {
+        final S3AFileStatus status = (S3AFileStatus) meta.getFileStatus();
+        // for directory, we query its direct children to determine isEmpty bit
+        if (status.isDirectory()) {
+          final QuerySpec spec = new QuerySpec()
+              .withHashKey(pathToParentKeyAttribute(path))
+              .withConsistentRead(true)
+              .withMaxResultSize(1); // limit 1
+          final ItemCollection<QueryOutcome> items = table.query(spec);
+          status.setIsEmptyDirectory(!(items.iterator().hasNext()));
+        }
+      }
+
+      return meta;
+    } catch (AmazonClientException e) {
+      throw translateException("get", path, e);
+    }
+  }
+
+  @Override
+  public DirListingMetadata listChildren(Path path) throws IOException {
+    path = checkPath(path);
+    LOG.debug("Listing table {} in region {}: {}", tableName, region, path);
+
+    try {
+      final QuerySpec spec = new QuerySpec()
+          .withHashKey(pathToParentKeyAttribute(path))
+          .withConsistentRead(true); // strictly consistent read
+      final ItemCollection<QueryOutcome> items = table.query(spec);
+
+      final List<PathMetadata> metas = new ArrayList<>();
+      for (Item item : items) {
+        metas.add(itemToPathMetadata(s3afs.getUri(), item, username));
+      }
+      LOG.trace("Listing table {} in region {} for {} returning {}",
+          tableName, region, path, metas);
+
+      return (metas.isEmpty() && get(path) == null)
+          ? null
+          : new DirListingMetadata(path, metas, false);
+    } catch (AmazonClientException e) {
+      throw translateException("listChildren", path, e);
+    }
+  }
+
+  @Override
+  public void move(Collection<Path> pathsToDelete,
+      Collection<PathMetadata> pathsToCreate) throws IOException {
+    final TableWriteItems writeItems = new TableWriteItems(tableName)
+        .withItemsToPut(pathMetadataToItem(pathsToCreate))
+        .withPrimaryKeysToDelete(pathToKey(pathsToDelete));
+    try {
+      BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems);
+
+      // Check for unprocessed keys in case of exceeding provisioned throughput
+      Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems();
+      while (unprocessed.size() > 0) {
+        res = dynamoDB.batchWriteItemUnprocessed(unprocessed);
+        unprocessed = res.getUnprocessedItems();
+      }
+    } catch (AmazonClientException e) {
+      throw translateException("createTable", (String) null, e);
+    }
+  }
+
+  @Override
+  public void put(PathMetadata meta) throws IOException {
+    checkPathMetadata(meta);
+    LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
+    innerPut(meta);
+
+    // put all its ancestors if not present; as an optimization we return at its
+    // first existent ancestor
+    Path path = meta.getFileStatus().getPath().getParent();
+    while (path != null && !path.isRoot()) {
+      final GetItemSpec spec = new GetItemSpec()
+          .withPrimaryKey(pathToKey(path))
+          .withConsistentRead(true); // strictly consistent read
+      final Item item = table.getItem(spec);
+      if (item == null) {
+        final S3AFileStatus status = new S3AFileStatus(false, path, username);
+        innerPut(new PathMetadata(status));
+        path = path.getParent();
+      } else {
+        break;
+      }
+    }
+  }
+
+  private void innerPut(PathMetadata meta) throws IOException {
+    final Path path = meta.getFileStatus().getPath();
+    if (path.isRoot()) {
+      LOG.debug("Root path / is not persisted");
+      return;
+    }
+
+    try {
+      table.putItem(pathMetadataToItem(meta));
+    } catch (AmazonClientException e) {
+      throw translateException("put", path, e);
+    }
+  }
+
+  @Override
+  public void put(DirListingMetadata meta) throws IOException {
+    LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
+
+    for (PathMetadata pathMetadata : meta.getListing()) {
+      put(pathMetadata);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (dynamoDB != null) {
+      LOG.info("Shutting down {}", this);
+      dynamoDB.shutdown();
+      dynamoDB = null;
+    }
+  }
+
+  @Override
+  public void destroy() throws IOException {
+    LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
+    try {
+      table.delete();
+      table.waitForDelete();
+    } catch (ResourceNotFoundException rnfe) {
+      LOG.info("ResourceNotFoundException while deleting DynamoDB table {} in "
+              + "region {}.  This may indicate that the table does not exist, "
+              + "or has been deleted by another concurrent thread or process.",
+          tableName, region);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted",
+          tableName, ie);
+      throw new IOException("Table " + tableName + " in region " + region
+          + " has not been deleted");
+    } catch (AmazonClientException e) {
+      throw translateException("destroy", (String) null, e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + '{'
+        + "region=" + region
+        + ", tableName=" + tableName
+        + '}';
+  }
+
+  /**
+   * Get the existing table and wait for it to become active.
+   *
+   * If a table with the intended name already exists, then it logs the
+   * {@link ResourceInUseException} and uses that table. The DynamoDB table
+   * creation API is asynchronous.  This method wait for the table to become
+   * active after sending the creation request, so overall, this method is
+   * synchronous, and the table is guaranteed to exist after this method
+   * returns successfully.
+   */
+  void createTable() throws IOException {
+    final Configuration conf = s3afs.getConf();
+    final ProvisionedThroughput capacity = new ProvisionedThroughput(
+        conf.getLong(S3Guard.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
+            S3Guard.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT),
+        conf.getLong(S3Guard.S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
+            S3Guard.S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT));
+
+    try {
+      LOG.info("Creating DynamoDB table {} in region {}", tableName, region);
+      table = dynamoDB.createTable(new CreateTableRequest()
+          .withTableName(tableName)
+          .withKeySchema(keySchema())
+          .withAttributeDefinitions(attributeDefinitions())
+          .withProvisionedThroughput(capacity));
+    } catch (ResourceInUseException e) {
+      LOG.info("ResourceInUseException while creating DynamoDB table {} in "
+              + "region {}.  This may indicate that the table was created by "
+              + "another concurrent thread or process.",
+          tableName, region);
+      table = dynamoDB.getTable(tableName);
+    }
+
+    try {
+      table.waitForActive();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for DynamoDB table {} active",
+          tableName, e);
+      Thread.currentThread().interrupt();
+      throw new IOException("DynamoDB table '" + tableName + "' is not active "
+          + "yet in region " + region);
+    } catch (AmazonClientException e) {
+      throw translateException("createTable", (String) null, e);
+    }
+  }
+
+  /**
+   * Provision the table with given read and write capacity units.
+   */
+  void provisionTable(Long readCapacity, Long writeCapacity)
+      throws IOException {
+    final ProvisionedThroughput toProvision = new ProvisionedThroughput()
+        .withReadCapacityUnits(readCapacity)
+        .withWriteCapacityUnits(writeCapacity);
+    try {
+      final ProvisionedThroughputDescription p =
+          table.updateTable(toProvision).getProvisionedThroughput();
+      LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
+              + "writeCapacityUnits={}",
+          tableName, region, p.getReadCapacityUnits(),
+          p.getWriteCapacityUnits());
+    } catch (AmazonClientException e) {
+      throw translateException("provisionTable", (String) null, e);
+    }
+  }
+
+  Table getTable() {
+    return table;
+  }
+
+  String getRegion() {
+    return region;
+  }
+
+  @VisibleForTesting
+  DynamoDB getDynamoDB() {
+    return dynamoDB;
+  }
+
+  /**
+   * Validates a path object; and make it qualified if it's not.
+   */
+  private Path checkPath(Path path) {
+    Preconditions.checkNotNull(path);
+    Preconditions.checkArgument(path.isAbsolute(),
+        "Path '" + path + "' is not absolute!");
+    return path.makeQualified(s3afs.getUri(), null);
+  }
+
+  /**
+   * Validates a path meta-data object.
+   */
+  private static void checkPathMetadata(PathMetadata meta) {
+    Preconditions.checkNotNull(meta);
+    Preconditions.checkNotNull(meta.getFileStatus());
+    Preconditions.checkNotNull(meta.getFileStatus().getPath());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
index 63c931d..bf1fdd7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -249,6 +249,13 @@ public class LocalMetadataStore implements MetadataStore {
 
   }
 
+  @Override
+  public void destroy() throws IOException {
+    if (dirHash != null) {
+      dirHash.clear();
+    }
+  }
+
   @VisibleForTesting
   static <T> void clearHashByAncestor(Path ancestor, Map<Path, T> hash) {
     for (Iterator<Map.Entry<Path, T>> it = hash.entrySet().iterator();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
index f261fc9..6d3c440 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
@@ -143,4 +143,17 @@ public interface MetadataStore extends Closeable {
    * @throws IOException if there is an error
    */
   void put(DirListingMetadata meta) throws IOException;
+
+  /**
+   * Destroy all resources associated with the metadata store.
+   *
+   * The destroyed resources can be DynamoDB tables, MySQL databases/tables, or
+   * HDFS directories. Any operations after calling this method may possibly
+   * fail.
+   *
+   * This operation is idempotent.
+   *
+   * @throws IOException if there is an error
+   */
+  void destroy() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
index b3c9648..8041870 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
@@ -76,4 +76,9 @@ public class NullMetadataStore implements MetadataStore {
   public void put(DirListingMetadata meta) throws IOException {
     return;
   }
+
+  @Override
+  public void destroy() throws IOException {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
new file mode 100644
index 0000000..b3e23eb
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.KeyAttribute;
+import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+
+/**
+ * Defines methods for translating between domain model objects and their
+ * representations in the DynamoDB schema.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+final class PathMetadataDynamoDBTranslation {
+
+  /** The HASH key name of each item. */
+  @VisibleForTesting
+  static final String PARENT = "parent";
+  /** The RANGE key name of each item. */
+  @VisibleForTesting
+  static final String CHILD = "child";
+  @VisibleForTesting
+  static final String IS_DIR = "is_dir";
+  @VisibleForTesting
+  static final String MOD_TIME = "mod_time";
+  @VisibleForTesting
+  static final String FILE_LENGTH = "file_length";
+  @VisibleForTesting
+  static final String BLOCK_SIZE = "block_size";
+
+  /**
+   * Returns the key schema for the DynamoDB table.
+   *
+   * @return DynamoDB key schema
+   */
+  static Collection<KeySchemaElement> keySchema() {
+    return Arrays.asList(
+        new KeySchemaElement(PARENT, KeyType.HASH),
+        new KeySchemaElement(CHILD, KeyType.RANGE));
+  }
+
+  /**
+   * Returns the attribute definitions for the DynamoDB table.
+   *
+   * @return DynamoDB attribute definitions
+   */
+  static Collection<AttributeDefinition> attributeDefinitions() {
+    return Arrays.asList(
+        new AttributeDefinition(PARENT, ScalarAttributeType.S),
+        new AttributeDefinition(CHILD, ScalarAttributeType.S));
+  }
+
+  /**
+   * Converts a DynamoDB item to a {@link PathMetadata}.
+   *
+   * @param item DynamoDB item to convert
+   * @return {@code item} converted to a {@link PathMetadata}
+   */
+  static PathMetadata itemToPathMetadata(URI s3aUri, Item item, String username)
+      throws IOException {
+    if (item == null) {
+      return null;
+    }
+
+    Path path = new Path(item.getString(PARENT), item.getString(CHILD));
+    if (!path.isAbsoluteAndSchemeAuthorityNull()) {
+      return null;
+    }
+
+    path = path.makeQualified(s3aUri, null);
+    boolean isDir = item.hasAttribute(IS_DIR) && item.getBoolean(IS_DIR);
+    final FileStatus fileStatus;
+    if (isDir) {
+      fileStatus = new S3AFileStatus(true, path, username);
+    } else {
+      long len = item.hasAttribute(FILE_LENGTH) ? item.getLong(FILE_LENGTH) : 0;
+      long modTime = item.hasAttribute(MOD_TIME) ? item.getLong(MOD_TIME) : 0;
+      long block = item.hasAttribute(BLOCK_SIZE) ? item.getLong(BLOCK_SIZE) : 0;
+      fileStatus = new S3AFileStatus(len, modTime, path, block, username);
+    }
+
+    return new PathMetadata(fileStatus);
+  }
+
+  /**
+   * Converts a {@link PathMetadata} to a DynamoDB item.
+   *
+   * @param meta {@link PathMetadata} to convert
+   * @return {@code meta} converted to DynamoDB item
+   */
+  static Item pathMetadataToItem(PathMetadata meta) {
+    Preconditions.checkNotNull(meta);
+    assert meta.getFileStatus() instanceof S3AFileStatus;
+    final S3AFileStatus status = (S3AFileStatus) meta.getFileStatus();
+    final Item item = new Item().withPrimaryKey(pathToKey(status.getPath()));
+    if (status.isDirectory()) {
+      item.withBoolean(IS_DIR, true);
+    } else {
+      item.withLong(FILE_LENGTH, status.getLen())
+          .withLong(MOD_TIME, status.getModificationTime())
+          .withLong(BLOCK_SIZE, status.getBlockSize());
+    }
+
+    return item;
+  }
+
+  /**
+   * Converts a collection {@link PathMetadata} to a collection DynamoDB items.
+   *
+   * @see #pathMetadataToItem(PathMetadata)
+   */
+  static Collection<Item> pathMetadataToItem(Collection<PathMetadata> metas) {
+    final List<Item> items = new ArrayList<>(metas.size());
+    for (PathMetadata meta : metas) {
+      items.add(pathMetadataToItem(meta));
+    }
+    return items;
+  }
+
+  /**
+   * Converts a {@link Path} to a DynamoDB equality condition on that path as
+   * parent, suitable for querying all direct children of the path.
+   *
+   * @param path the path; can not be null
+   * @return DynamoDB equality condition on {@code path} as parent
+   */
+  static KeyAttribute pathToParentKeyAttribute(Path path) {
+    removeSchemeAndAuthority(path);
+    return new KeyAttribute(PARENT, path.toUri().getPath());
+  }
+
+  /**
+   * Converts a {@link Path} to a DynamoDB key, suitable for getting the item
+   * matching the path.
+   *
+   * @param path the path; can not be null
+   * @return DynamoDB key for item matching {@code path}
+   */
+  static PrimaryKey pathToKey(Path path) {
+    path = removeSchemeAndAuthority(path);
+    Preconditions.checkArgument(!path.isRoot(),
+        "Root path is not mapped to any PrimaryKey");
+    return new PrimaryKey(PARENT, path.getParent().toUri().getPath(),
+        CHILD, path.getName());
+  }
+
+  /**
+   * Converts a collection of {@link Path} to a collection of DynamoDB keys.
+   *
+   * @see #pathToKey(Path)
+   */
+  static PrimaryKey[] pathToKey(Collection<Path> paths) {
+    Preconditions.checkNotNull(paths);
+    final PrimaryKey[] keys = new PrimaryKey[paths.size()];
+    int i = 0;
+    for (Path p : paths) {
+      keys[i++] = pathToKey(p);
+    }
+    return keys;
+  }
+
+  private static Path removeSchemeAndAuthority(Path path) {
+    Preconditions.checkNotNull(path);
+    return Path.getPathWithoutSchemeAndAuthority(path);
+  }
+
+  /**
+   * There is no need to instantiate this class.
+   */
+  private PathMetadataDynamoDBTranslation() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index 904a1c3..d8469de 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.fs.s3a.s3guard;
 
 import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +48,57 @@ final public class S3Guard {
   public static final String S3_METADATA_STORE_IMPL =
       "fs.s3a.metadatastore.impl";
 
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static final String S3GUARD_DDB_CLIENT_FACTORY_IMPL =
+      "fs.s3a.s3guard.ddb.client.factory.impl";
+
+  static final Class<? extends DynamoDBClientFactory>
+      S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT =
+      DynamoDBClientFactory.DefaultDynamoDBClientFactory.class;
+
+  /**
+   * The endpoint of the DynamoDB service.
+   *
+   * This config has not default value. If the user does not set this, the AWS
+   * SDK will find the endpoint automatically by the Region.
+   */
+  @InterfaceStability.Unstable
+  static final String S3GUARD_DDB_ENDPOINT_KEY =
+      "fs.s3a.s3guard.ddb.endpoint";
+
+  /**
+   * The DynamoDB table name to use.
+   *
+   * This config has no default value. If the user does not set this, the
+   * S3Guard implementation will use the respective S3 bucket name.
+   */
+  @InterfaceStability.Unstable
+  static final String S3GUARD_DDB_TABLE_NAME_KEY =
+      "fs.s3a.s3guard.ddb.table";
+
+  /**
+   * Whether to create the table.
+   *
+   * This is for internal usage and users should not set this one directly.
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static final String S3GUARD_DDB_TABLE_CREATE_KEY =
+      "fs.s3a.s3guard.ddb.table.create";
+
+  @InterfaceStability.Unstable
+  static final String S3GUARD_DDB_TABLE_CAPACITY_READ_KEY =
+      "fs.s3a.s3guard.ddb.table.capacity.read";
+
+  static final long S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT = 500;
+
+  @InterfaceStability.Unstable
+  static final String S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY =
+      "fs.s3a.s3guard.ddb.table.capacity.write";
+
+  static final long S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT = 100;
+
   // Utility class.  All static functions.
   private S3Guard() { }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
index 6734947..398d671 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
@@ -26,6 +27,8 @@ import com.amazonaws.services.s3.AmazonS3;
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
 
 import org.junit.After;
 import org.junit.Before;
@@ -33,7 +36,8 @@ import org.junit.Rule;
 import org.junit.rules.ExpectedException;
 
 /**
- * Abstract base class for S3A unit tests using a mock S3 client.
+ * Abstract base class for S3A unit tests using a mock S3 client and a null
+ * metadata store.
  */
 public abstract class AbstractS3AMockTest {
 
@@ -55,6 +59,10 @@ public abstract class AbstractS3AMockTest {
     Configuration conf = new Configuration();
     conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
         S3ClientFactory.class);
+    // We explicitly disable MetadataStore even if it's configured. For unit
+    // test we don't issue request to AWS DynamoDB service.
+    conf.setClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
+        MetadataStore.class);
     fs = new S3AFileSystem();
     URI uri = URI.create(FS_S3A + "://" + BUCKET);
     fs.initialize(uri, conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
new file mode 100644
index 0000000..66367c6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Tests which exercise treatment of empty/non-empty directories.
+ */
+public class ITestS3AEmptyDirectory extends AbstractS3ATestBase {
+
+  @Test
+  public void testDirectoryBecomesEmpty() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+
+    // 1. set up non-empty dir
+    Path dir = path("testEmptyDir");
+    Path child = path("testEmptyDir/dir2");
+    mkdirs(child);
+
+    S3AFileStatus status = getS3AFileStatus(fs, dir);
+    assertFalse("Dir status should not be empty", status.isEmptyDirectory());
+
+    // 2. Make testEmptyDir empty
+    assertDeleted(child, false);
+    status = getS3AFileStatus(fs, dir);
+
+    assertTrue("Dir status should be empty", status.isEmptyDirectory());
+  }
+
+  @Test
+  public void testDirectoryBecomesNonEmpty() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+
+    // 1. create empty dir
+    Path dir = path("testEmptyDir");
+    mkdirs(dir);
+
+    S3AFileStatus status = getS3AFileStatus(fs, dir);
+    assertTrue("Dir status should be empty", status.isEmptyDirectory());
+
+    // 2. Make testEmptyDir non-empty
+
+    ContractTestUtils.touch(fs, path("testEmptyDir/file1"));
+    status = getS3AFileStatus(fs, dir);
+
+    assertFalse("Dir status should not be empty", status.isEmptyDirectory());
+  }
+
+  private S3AFileStatus getS3AFileStatus(S3AFileSystem fs, Path p) throws
+      IOException {
+    FileStatus s = fs.getFileStatus(p);
+    assertTrue(s instanceof S3AFileStatus);
+    return (S3AFileStatus)s;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
index fb46357..478cfb3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a.s3guard;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -29,8 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -50,11 +50,11 @@ public abstract class MetadataStoreTestBase extends Assert {
       LoggerFactory.getLogger(MetadataStoreTestBase.class);
 
   /** Some dummy values for sanity-checking FileStatus contents. */
-  protected static final long BLOCK_SIZE = 32 * 1024 * 1024;
-  protected static final int REPLICATION = 1;
-  private static final FsPermission PERMISSION = new FsPermission((short)0755);
-  private static final String OWNER = "bob";
-  private static final String GROUP = "uncles";
+  static final long BLOCK_SIZE = 32 * 1024 * 1024;
+  static final int REPLICATION = 1;
+  static final FsPermission PERMISSION = new FsPermission((short)0755);
+  static final String OWNER = "bob";
+  static final String GROUP = "uncles";
   private final long accessTime = System.currentTimeMillis();
   private final long modTime = accessTime - 5000;
 
@@ -62,7 +62,7 @@ public abstract class MetadataStoreTestBase extends Assert {
    * Each test should override this.
    * @return Contract which specifies the MetadataStore under test plus config.
    */
-  public abstract AbstractMSContract createContract();
+  public abstract AbstractMSContract createContract() throws IOException;
 
   /**
    * Tests assume that implementations will return recently set results.  If
@@ -94,11 +94,52 @@ public abstract class MetadataStoreTestBase extends Assert {
   public void tearDown() throws Exception {
     LOG.debug("== Tear down. ==");
     if (ms != null) {
-      ms.close();
+      ms.destroy();
       ms = null;
     }
   }
 
+  /**
+   * Test that we can get the whole sub-tree by iterating DescendantsIterator.
+   *
+   * The tree is similar to or same as the example in code comment.
+   */
+  @Test
+  public void testDescendantsIterator() throws IOException {
+    final String[] tree = new String[] {
+        "/dir1",
+        "/dir1/dir2",
+        "/dir1/dir3",
+        "/dir1/dir2/file1",
+        "/dir1/dir2/file2",
+        "/dir1/dir3/dir4",
+        "/dir1/dir3/dir5",
+        "/dir1/dir3/dir4/file3",
+        "/dir1/dir3/dir5/file4",
+        "/dir1/dir3/dir6"
+    };
+    // we set up the example file system tree in metadata store
+    for (String pathStr : tree) {
+      final FileStatus status = pathStr.contains("file")
+          ? basicFileStatus(strToPath(pathStr), 100, false)
+          : basicFileStatus(strToPath(pathStr), 0, true);
+      ms.put(new PathMetadata(status));
+    }
+
+    final Set<String> actual = new HashSet<>();
+    final PathMetadata rootMeta = new PathMetadata(makeDirStatus("/"));
+    for (DescendantsIterator desc = new DescendantsIterator(ms, rootMeta);
+         desc.hasNext();) {
+      final Path p = desc.next().getFileStatus().getPath();
+      actual.add(Path.getPathWithoutSchemeAndAuthority(p).toString());
+    }
+    LOG.info("We got {} by iterating DescendantsIterator", actual);
+
+    if (!allowMissing()) {
+      assertEquals(Sets.newHashSet(tree), actual);
+    }
+  }
+
   @Test
   public void testPutNew() throws Exception {
     /* create three dirs /da1, /da2, /da3 */
@@ -110,13 +151,13 @@ public abstract class MetadataStoreTestBase extends Assert {
      */
     ms.put(new PathMetadata(makeFileStatus("/da1/db1/fc1", 100)));
 
-    assertEmptyDirs("/da1", "/da2", "/da3");
+    assertEmptyDirs("/da2", "/da3");
     assertDirectorySize("/da1/db1", 1);
 
     /* Check contents of dir status. */
     PathMetadata dirMeta = ms.get(strToPath("/da1"));
     if (!allowMissing() || dirMeta != null) {
-      verifyDirStatus(dirMeta);
+      verifyDirStatus(dirMeta.getFileStatus());
     }
 
     /* This already exists, and should silently replace it. */
@@ -136,8 +177,7 @@ public abstract class MetadataStoreTestBase extends Assert {
     PathMetadata meta = ms.get(strToPath("/da1/db1/fc2"));
     if (!allowMissing() || meta != null) {
       assertNotNull("Get file after put new.", meta);
-      assertEquals("Cached file size correct.", 200,
-          meta.getFileStatus().getLen());
+      verifyFileStatus(meta.getFileStatus(), 200);
     }
   }
 
@@ -149,13 +189,13 @@ public abstract class MetadataStoreTestBase extends Assert {
     ms.put(new PathMetadata(makeDirStatus(dirPath)));
     PathMetadata meta = ms.get(strToPath(filePath));
     if (!allowMissing() || meta != null) {
-      verifyBasicFileStatus(meta);
+      verifyFileStatus(meta.getFileStatus(), 100);
     }
 
     ms.put(new PathMetadata(basicFileStatus(strToPath(filePath), 9999, false)));
     meta = ms.get(strToPath(filePath));
     if (!allowMissing() || meta != null) {
-      assertEquals("Updated size", 9999, meta.getFileStatus().getLen());
+      verifyFileStatus(meta.getFileStatus(), 9999);
     }
   }
 
@@ -194,7 +234,7 @@ public abstract class MetadataStoreTestBase extends Assert {
 
   @Test
   public void testDeleteSubtreeHostPath() throws Exception {
-    deleteSubtreeHelper("s3a://test-bucket-name");
+    deleteSubtreeHelper(contract.getFileSystem().getUri().toString());
   }
 
   private void deleteSubtreeHelper(String pathPrefix) throws Exception {
@@ -273,7 +313,7 @@ public abstract class MetadataStoreTestBase extends Assert {
     PathMetadata meta = ms.get(strToPath(filePath));
     if (!allowMissing() || meta != null) {
       assertNotNull("Get found file", meta);
-      verifyBasicFileStatus(meta);
+      verifyFileStatus(meta.getFileStatus(), 100);
     }
 
     meta = ms.get(strToPath(dirPath));
@@ -389,7 +429,7 @@ public abstract class MetadataStoreTestBase extends Assert {
     meta = ms.get(strToPath("/b1/file1"));
     if (!allowMissing() || meta != null) {
       assertNotNull("dest file not null", meta);
-      verifyBasicFileStatus(meta);
+      verifyFileStatus(meta.getFileStatus(), 100);
     }
 
     dirMeta = ms.listChildren(strToPath("/b1"));
@@ -455,7 +495,7 @@ public abstract class MetadataStoreTestBase extends Assert {
     for (String ps : pathStrs) {
       b.add(strToPath(ps));
     }
-    assertTrue("Same set of files", a.equals(b));
+    assertEquals("Same set of files", b, a);
   }
 
   private void putListStatusFiles(String dirPath, boolean authoritative,
@@ -508,21 +548,13 @@ public abstract class MetadataStoreTestBase extends Assert {
     assertNotNull(pathStr + " should be cached.", meta);
   }
 
-  // Convenience to add scheme if missing
+  /**
+   * Convenience to create a fully qualified Path from string.
+   */
   private Path strToPath(String p) {
-    Path path = new Path(p);
-    URI uri = path.toUri();
-    if (uri.getScheme() == null) {
-      String fsScheme = contract.getFileSystem().getScheme();
-      try {
-        return new Path(new URI(fsScheme, uri.getHost(), uri.getPath(),
-            uri.getFragment()));
-      } catch (URISyntaxException e) {
-        throw new IllegalArgumentException("FileStatus path invalid with " +
-            "scheme " + fsScheme + " added", e);
-      }
-    }
-    return path;
+    final Path path = new Path(p);
+    assert path.isAbsolute();
+    return path.makeQualified(contract.getFileSystem().getUri(), null);
   }
 
   private void assertEmptyDirectory(String pathStr) throws IOException {
@@ -535,40 +567,42 @@ public abstract class MetadataStoreTestBase extends Assert {
     }
   }
 
-  private FileStatus basicFileStatus(Path path, int size, boolean isDir) {
+  FileStatus basicFileStatus(Path path, int size, boolean isDir)
+      throws IOException {
     return new FileStatus(size, isDir, REPLICATION, BLOCK_SIZE, modTime,
         accessTime, PERMISSION, OWNER, GROUP, path);
   }
 
-  private FileStatus makeFileStatus(String pathStr, int size) {
+  private FileStatus makeFileStatus(String pathStr, int size)
+      throws IOException {
     return basicFileStatus(strToPath(pathStr), size, false);
   }
 
-  private void verifyBasicFileStatus(PathMetadata meta) {
-    FileStatus status = meta.getFileStatus();
+  void verifyFileStatus(FileStatus status, long size) {
     assertFalse("Not a dir", status.isDirectory());
-    assertEquals("Replication value", REPLICATION, status.getReplication());
-    assertEquals("Access time", accessTime, status.getAccessTime());
     assertEquals("Mod time", modTime, status.getModificationTime());
+    assertEquals("File size", size, status.getLen());
     assertEquals("Block size", BLOCK_SIZE, status.getBlockSize());
-    assertEquals("Owner", OWNER, status.getOwner());
-    assertEquals("Group", GROUP, status.getGroup());
-    assertEquals("Permission", PERMISSION, status.getPermission());
   }
 
-  private FileStatus makeDirStatus(String pathStr) {
+  private FileStatus makeDirStatus(String pathStr) throws IOException {
     return basicFileStatus(strToPath(pathStr), 0, true);
   }
 
-  private void verifyDirStatus(PathMetadata meta) {
-    FileStatus status = meta.getFileStatus();
+  /**
+   * Verify the directory file status. Subclass may verify additional fields.
+   */
+  void verifyDirStatus(FileStatus status) {
     assertTrue("Is a dir", status.isDirectory());
     assertEquals("zero length", 0, status.getLen());
-    assertEquals("Replication value", REPLICATION, status.getReplication());
-    assertEquals("Access time", accessTime, status.getAccessTime());
-    assertEquals("Mod time", modTime, status.getModificationTime());
-    assertEquals("Owner", OWNER, status.getOwner());
-    assertEquals("Group", GROUP, status.getGroup());
-    assertEquals("Permission", PERMISSION, status.getPermission());
   }
+
+  long getModTime() {
+    return modTime;
+  }
+
+  long getAccessTime() {
+    return accessTime;
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
new file mode 100644
index 0000000..daeb9ac
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.local.main.ServerRunner;
+import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import com.amazonaws.services.dynamodbv2.model.TableDescription;
+
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.MockS3ClientFactory;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema;
+
+/**
+ * Test that {@link DynamoDBMetadataStore} implements {@link MetadataStore}.
+ *
+ * In this unit test, we create an in-memory DynamoDBLocal server instance for
+ * all unit test cases.  You won't be charged bills for DynamoDB requests when
+ * you run this test.  An {@link S3AFileSystem} object is created and shared for
+ * initializing {@link DynamoDBMetadataStore} objects.  There are no real S3
+ * request issued as the underlying AWS S3Client is mocked.
+ *
+ * According to the base class, every test case will have independent contract
+ * to create a new {@link DynamoDBMetadataStore} instance and initializes it.
+ * A table will be created for each test by the test contract, and will be
+ * destroyed after the test case finishes.
+ */
+public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDynamoDBMetadataStore.class);
+  private static final String BUCKET = "TestDynamoDBMetadataStore";
+
+  /** The DynamoDBLocal dynamoDBLocalServer instance for testing. */
+  private static DynamoDBProxyServer dynamoDBLocalServer;
+  private static String ddbEndpoint;
+  /** The DynamoDB instance that can issue requests directly to server. */
+  private static DynamoDB dynamoDB;
+
+  @Rule
+  public final Timeout timeout = new Timeout(60 * 1000);
+
+  /**
+   * Sets up the in-memory DynamoDBLocal server and initializes s3 file system.
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    GenericTestUtils.setLogLevel(DynamoDBMetadataStore.LOG, Level.ALL);
+    // sqlite4java library should have been copied to target/native-libs
+    System.setProperty("sqlite4java.library.path", "target/native-libs");
+
+    // Set up the in-memory local DynamoDB instance for all test cases
+    final String port = String.valueOf(ServerSocketUtil.getPort(0, 100));
+    dynamoDBLocalServer = ServerRunner.createServerFromCommandLineArgs(
+        new String[] {"-inMemory", "-port", port});
+    dynamoDBLocalServer.start();
+    ddbEndpoint = "http://localhost:" + port;
+    LOG.info("DynamoDBLocal for test was started at {}", ddbEndpoint);
+
+    try {
+      dynamoDB = new DynamoDBMSContract().getMetadataStore().getDynamoDB();
+    } catch (AmazonServiceException e) {
+      final String msg = "Cannot initialize a DynamoDBMetadataStore instance "
+          + "against the local DynamoDB server. Perhaps the DynamoDBLocal "
+          + "server is not configured correctly. ";
+      LOG.error(msg, e);
+      // fail fast if the DynamoDBLocal server can not work
+      fail(msg + e.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if (dynamoDB != null) {
+      dynamoDB.shutdown();
+    }
+    if (dynamoDBLocalServer != null) {
+      LOG.info("Shutting down the in-memory local DynamoDB server");
+      try {
+        dynamoDBLocalServer.stop();
+      } catch (Exception e) {
+        final String msg = "Got exception to stop the DynamoDBLocal server. ";
+        LOG.error(msg, e);
+        fail(msg + e.getLocalizedMessage());
+      }
+    }
+  }
+
+  /**
+   * Each contract has its own S3AFileSystem and DynamoDBMetadataStore objects.
+   */
+  private static class DynamoDBMSContract extends AbstractMSContract {
+    private final S3AFileSystem s3afs;
+    private final DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
+
+    DynamoDBMSContract() throws IOException {
+      final Configuration conf = new Configuration();
+      // using mocked S3 clients
+      conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
+          S3ClientFactory.class);
+      conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+          URI.create(Constants.FS_S3A + "://" + BUCKET).toString());
+      // setting config for creating a DynamoDBClient against local server
+      conf.set(Constants.ACCESS_KEY, "dummy-access-key");
+      conf.set(Constants.SECRET_KEY, "dummy-secret-key");
+      conf.set(S3Guard.S3GUARD_DDB_ENDPOINT_KEY, ddbEndpoint);
+
+      // always create new file system object for a test contract
+      s3afs = (S3AFileSystem) FileSystem.newInstance(conf);
+      ms.initialize(s3afs);
+    }
+
+    @Override
+    public S3AFileSystem getFileSystem() {
+      return s3afs;
+    }
+
+    @Override
+    public DynamoDBMetadataStore getMetadataStore() {
+      return ms;
+    }
+  }
+
+  @Override
+  public DynamoDBMSContract createContract() throws IOException {
+    return new DynamoDBMSContract();
+  }
+
+  @Override
+  FileStatus basicFileStatus(Path path, int size, boolean isDir)
+      throws IOException {
+    String owner = UserGroupInformation.getCurrentUser().getShortUserName();
+    return isDir
+        ? new S3AFileStatus(false, path, owner)
+        : new S3AFileStatus(size, getModTime(), path, BLOCK_SIZE, owner);
+  }
+
+  /**
+   * This tests that after initialize() using an S3AFileSystem object, the
+   * instance should have been initialized successfully, and tables are ACTIVE.
+   */
+  @Test
+  public void testInitialize() throws IOException {
+    final String tableName = "testInitializeWithFileSystem";
+    final S3AFileSystem s3afs = createContract().getFileSystem();
+    final Configuration conf = s3afs.getConf();
+    conf.set(S3Guard.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+    try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+      ddbms.initialize(s3afs);
+      verifyTableInitialized(tableName);
+      assertNotNull(ddbms.getTable());
+      assertEquals(tableName, ddbms.getTable().getTableName());
+      assertEquals("DynamoDB table should be in the same region as S3 bucket",
+          s3afs.getAmazonS3Client().getBucketLocation(tableName),
+          ddbms.getRegion());
+    }
+  }
+
+  /**
+   * This tests that after initialize() using a Configuration object, the
+   * instance should have been initialized successfully, and tables are ACTIVE.
+   */
+  @Test
+  public void testInitializeWithConfiguration() throws IOException {
+    final String tableName = "testInitializeWithConfiguration";
+    final Configuration conf = createContract().getFileSystem().getConf();
+    conf.set(S3Guard.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+    try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+      ddbms.initialize(conf);
+      verifyTableInitialized(tableName);
+      assertNotNull(ddbms.getTable());
+      assertEquals(tableName, ddbms.getTable().getTableName());
+      assertEquals("Unexpected key schema found!",
+          keySchema(),
+          ddbms.getTable().describe().getKeySchema());
+    }
+  }
+
+  @Test
+  public void testCreateExistingTable() throws IOException {
+    final DynamoDBMetadataStore ddbms = createContract().getMetadataStore();
+    verifyTableInitialized(BUCKET);
+    // create existing table
+    ddbms.createTable();
+    verifyTableInitialized(BUCKET);
+  }
+
+  /**
+   * Test cases about root directory as it is not in the DynamoDB table.
+   */
+  @Test
+  public void testRootDirectory() throws IOException {
+    final DynamoDBMetadataStore ddbms = createContract().getMetadataStore();
+    verifyRootDirectory(ddbms.get(new Path("/")), true);
+
+    ddbms.put(new PathMetadata(new S3AFileStatus(true,
+        new Path("/foo"),
+        UserGroupInformation.getCurrentUser().getShortUserName())));
+    verifyRootDirectory(ddbms.get(new Path("/")), false);
+  }
+
+  private void verifyRootDirectory(PathMetadata rootMeta, boolean isEmpty) {
+    assertNotNull(rootMeta);
+    final S3AFileStatus status = (S3AFileStatus) rootMeta.getFileStatus();
+    assertNotNull(status);
+    assertTrue(status.isDirectory());
+    assertEquals(isEmpty, status.isEmptyDirectory());
+  }
+
+  @Test
+  public void testProvisionTable() throws IOException {
+    final DynamoDBMetadataStore ddbms = createContract().getMetadataStore();
+    final ProvisionedThroughputDescription oldProvision =
+        dynamoDB.getTable(BUCKET).describe().getProvisionedThroughput();
+    ddbms.provisionTable(oldProvision.getReadCapacityUnits() * 2,
+        oldProvision.getWriteCapacityUnits() * 2);
+    final ProvisionedThroughputDescription newProvision =
+        dynamoDB.getTable(BUCKET).describe().getProvisionedThroughput();
+    LOG.info("Old provision = {}, new provision = {}",
+        oldProvision, newProvision);
+    assertEquals(oldProvision.getReadCapacityUnits() * 2,
+        newProvision.getReadCapacityUnits().longValue());
+    assertEquals(oldProvision.getWriteCapacityUnits() * 2,
+        newProvision.getWriteCapacityUnits().longValue());
+  }
+
+  @Test
+  public void testDeleteTable() throws IOException {
+    final String tableName = "testDeleteTable";
+    final S3AFileSystem s3afs = createContract().getFileSystem();
+    final Configuration conf = s3afs.getConf();
+    conf.set(S3Guard.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+    try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+      ddbms.initialize(s3afs);
+      // we can list the empty table
+      ddbms.listChildren(new Path("/"));
+
+      ddbms.destroy();
+      verifyTableNotExist(tableName);
+
+      // delete table once more; be ResourceNotFoundException swallowed silently
+      ddbms.destroy();
+      verifyTableNotExist(tableName);
+
+      try {
+        // we can no longer list the destroyed table
+        ddbms.listChildren(new Path("/"));
+        fail("Should have failed after the table is destroyed!");
+      } catch (IOException ignored) {
+      }
+    }
+  }
+
+  /**
+   * This validates the table is created and ACTIVE in DynamoDB.
+   *
+   * This should not rely on the {@link DynamoDBMetadataStore} implementation.
+   */
+  private static void verifyTableInitialized(String tableName) {
+    final Table table = dynamoDB.getTable(tableName);
+    final TableDescription td = table.describe();
+    assertEquals(tableName, td.getTableName());
+    assertEquals("ACTIVE", td.getTableStatus());
+  }
+
+  /**
+   * This validates the table is not found in DynamoDB.
+   *
+   * This should not rely on the {@link DynamoDBMetadataStore} implementation.
+   */
+  private static void verifyTableNotExist(String tableName) {
+    final Table table = dynamoDB.getTable(tableName);
+    try {
+      table.describe();
+      fail("Expecting ResourceNotFoundException for table '" + tableName + "'");
+    } catch (ResourceNotFoundException ignored) {
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
index 12aa9c6..68e9842 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a.s3guard;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
@@ -100,4 +101,27 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
     map.clear();
   }
 
+  @Override
+  protected void verifyFileStatus(FileStatus status, long size) {
+    super.verifyFileStatus(status, size);
+
+    assertEquals("Replication value", REPLICATION, status.getReplication());
+    assertEquals("Access time", getAccessTime(), status.getAccessTime());
+    assertEquals("Owner", OWNER, status.getOwner());
+    assertEquals("Group", GROUP, status.getGroup());
+    assertEquals("Permission", PERMISSION, status.getPermission());
+  }
+
+  @Override
+  protected void verifyDirStatus(FileStatus status) {
+    super.verifyDirStatus(status);
+
+    assertEquals("Mod time", getModTime(), status.getModificationTime());
+    assertEquals("Replication value", REPLICATION, status.getReplication());
+    assertEquals("Access time", getAccessTime(), status.getAccessTime());
+    assertEquals("Owner", OWNER, status.getOwner());
+    assertEquals("Group", GROUP, status.getGroup());
+    assertEquals("Permission", PERMISSION, status.getPermission());
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org