You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/09/03 23:55:41 UTC

[61/77] [abbrv] hadoop git commit: HADOOP-13345 HS3Guard: Improved Consistency for S3A. Contributed by: Chris Nauroth, Aaron Fabbri, Mingliang Liu, Lei (Eddy) Xu, Sean Mackrory, Steve Loughran and others.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
new file mode 100644
index 0000000..c19ae91
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
@@ -0,0 +1,887 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a.s3guard;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Tristate;
+ * Main test class for MetadataStore implementations.
+ * Implementations should each create a test by subclassing this and
+ * overriding {@link #createContract()}.
+ * If your implementation may return missing results for recently set paths,
+ * override {@link MetadataStoreTestBase#allowMissing()}.
+ */
+public abstract class MetadataStoreTestBase extends Assert {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MetadataStoreTestBase.class);
+  /** Some dummy values for sanity-checking FileStatus contents. */
+  static final long BLOCK_SIZE = 32 * 1024 * 1024;
+  static final int REPLICATION = 1;
+  static final FsPermission PERMISSION = new FsPermission((short)0755);
+  static final String OWNER = "bob";
+  static final String GROUP = "uncles";
+  private final long accessTime = System.currentTimeMillis();
+  private final long modTime = accessTime - 5000;
+  /**
+   * Each test should override this.  Will use a new Configuration instance.
+   * @return Contract which specifies the MetadataStore under test plus config.
+   */
+  public abstract AbstractMSContract createContract() throws IOException;
+  /**
+   * Each test should override this.
+   * @param conf Base configuration instance to use.
+   * @return Contract which specifies the MetadataStore under test plus config.
+   */
+  public abstract AbstractMSContract createContract(Configuration conf)
+      throws IOException;
+  /**
+   * Tests assume that implementations will return recently set results.  If
+   * your implementation does not always hold onto metadata (e.g. LRU or
+   * time-based expiry) you can override this to return false.
+   * @return true if the test should succeed when null results are returned
+   *  from the MetadataStore under test.
+   */
+  public boolean allowMissing() {
+    return false;
+  }
+  /**
+   * Pruning is an optional feature for metadata store implementations.
+   * Tests will only check that functionality if it is expected to work.
+   * @return true if the test should expect pruning to work.
+   */
+  public boolean supportsPruning() {
+    return true;
+  }
+  /** The MetadataStore contract used to test against. */
+  private AbstractMSContract contract;
+  private MetadataStore ms;
+  /**
+   * @return reference to the test contract.
+   */
+  protected AbstractMSContract getContract() {
+    return contract;
+  }
+  @Before
+  public void setUp() throws Exception {
+    LOG.debug("== Setup. ==");
+    contract = createContract();
+    ms = contract.getMetadataStore();
+    assertNotNull("null MetadataStore", ms);
+    assertNotNull("null FileSystem", contract.getFileSystem());
+    ms.initialize(contract.getFileSystem());
+  }
+  @After
+  public void tearDown() throws Exception {
+    LOG.debug("== Tear down. ==");
+    if (ms != null) {
+      try {
+        ms.destroy();
+      } catch (Exception e) {
+        LOG.warn("Failed to destroy tables in teardown", e);
+      }
+      IOUtils.closeStream(ms);
+      ms = null;
+    }
+  }
+  /**
+   * Helper function for verifying DescendantsIterator and
+   * MetadataStoreListFilesIterator behavior.
+   * @param createNodes List of paths to create
+   * @param checkNodes List of paths that the iterator should return
+   */
+  private void doTestDescendantsIterator(
+      Class implementation, String[] createNodes,
+      String[] checkNodes) throws Exception {
+    // we set up the example file system tree in metadata store
+    for (String pathStr : createNodes) {
+      final FileStatus status = pathStr.contains("file")
+          ? basicFileStatus(strToPath(pathStr), 100, false)
+          : basicFileStatus(strToPath(pathStr), 0, true);
+      ms.put(new PathMetadata(status));
+    }
+    final PathMetadata rootMeta = new PathMetadata(makeDirStatus("/"));
+    RemoteIterator<FileStatus> iterator;
+    if (implementation == DescendantsIterator.class) {
+      iterator = new DescendantsIterator(ms, rootMeta);
+    } else if (implementation == MetadataStoreListFilesIterator.class) {
+      iterator = new MetadataStoreListFilesIterator(ms, rootMeta, false);
+    } else {
+      throw new UnsupportedOperationException("Unrecognized class");
+    }
+    final Set<String> actual = new HashSet<>();
+    while (iterator.hasNext()) {
+      final Path p =;
+      actual.add(Path.getPathWithoutSchemeAndAuthority(p).toString());
+    }
+"We got {} by iterating DescendantsIterator", actual);
+    if (!allowMissing()) {
+      assertEquals(Sets.newHashSet(checkNodes), actual);
+    }
+  }
+  /**
+   * Test that we can get the whole sub-tree by iterating DescendantsIterator.
+   *
+   * The tree is similar to or same as the example in code comment.
+   */
+  @Test
+  public void testDescendantsIterator() throws Exception {
+    final String[] tree = new String[] {
+        "/dir1",
+        "/dir1/dir2",
+        "/dir1/dir3",
+        "/dir1/dir2/file1",
+        "/dir1/dir2/file2",
+        "/dir1/dir3/dir4",
+        "/dir1/dir3/dir5",
+        "/dir1/dir3/dir4/file3",
+        "/dir1/dir3/dir5/file4",
+        "/dir1/dir3/dir6"
+    };
+    doTestDescendantsIterator(DescendantsIterator.class,
+        tree, tree);
+  }
+  /**
+   * Test that we can get the correct subset of the tree with
+   * MetadataStoreListFilesIterator.
+   *
+   * The tree is similar to or same as the example in code comment.
+   */
+  @Test
+  public void testMetadataStoreListFilesIterator() throws Exception {
+    final String[] wholeTree = new String[] {
+        "/dir1",
+        "/dir1/dir2",
+        "/dir1/dir3",
+        "/dir1/dir2/file1",
+        "/dir1/dir2/file2",
+        "/dir1/dir3/dir4",
+        "/dir1/dir3/dir5",
+        "/dir1/dir3/dir4/file3",
+        "/dir1/dir3/dir5/file4",
+        "/dir1/dir3/dir6"
+    };
+    final String[] leafNodes = new String[] {
+        "/dir1/dir2/file1",
+        "/dir1/dir2/file2",
+        "/dir1/dir3/dir4/file3",
+        "/dir1/dir3/dir5/file4"
+    };
+    doTestDescendantsIterator(MetadataStoreListFilesIterator.class, wholeTree,
+        leafNodes);
+  }
+  @Test
+  public void testPutNew() throws Exception {
+    /* create three dirs /da1, /da2, /da3 */
+    createNewDirs("/da1", "/da2", "/da3");
+    /* It is caller's responsibility to set up ancestor entries beyond the
+     * containing directory.  We only track direct children of the directory.
+     * Thus this will not affect entry for /da1.
+     */
+    ms.put(new PathMetadata(makeFileStatus("/da1/db1/fc1", 100)));
+    assertEmptyDirs("/da2", "/da3");
+    assertDirectorySize("/da1/db1", 1);
+    /* Check contents of dir status. */
+    PathMetadata dirMeta = ms.get(strToPath("/da1"));
+    if (!allowMissing() || dirMeta != null) {
+      verifyDirStatus(dirMeta.getFileStatus());
+    }
+    /* This already exists, and should silently replace it. */
+    ms.put(new PathMetadata(makeDirStatus("/da1/db1")));
+    /* If we had putNew(), and used it above, this would be empty again. */
+    assertDirectorySize("/da1", 1);
+    assertEmptyDirs("/da2", "/da3");
+    /* Ensure new files update correct parent dirs. */
+    ms.put(new PathMetadata(makeFileStatus("/da1/db1/fc1", 100)));
+    ms.put(new PathMetadata(makeFileStatus("/da1/db1/fc2", 200)));
+    assertDirectorySize("/da1", 1);
+    assertDirectorySize("/da1/db1", 2);
+    assertEmptyDirs("/da2", "/da3");
+    PathMetadata meta = ms.get(strToPath("/da1/db1/fc2"));
+    if (!allowMissing() || meta != null) {
+      assertNotNull("Get file after put new.", meta);
+      verifyFileStatus(meta.getFileStatus(), 200);
+    }
+  }
+  @Test
+  public void testPutOverwrite() throws Exception {
+    final String filePath = "/a1/b1/c1/some_file";
+    final String dirPath = "/a1/b1/c1/d1";
+    ms.put(new PathMetadata(makeFileStatus(filePath, 100)));
+    ms.put(new PathMetadata(makeDirStatus(dirPath)));
+    PathMetadata meta = ms.get(strToPath(filePath));
+    if (!allowMissing() || meta != null) {
+      verifyFileStatus(meta.getFileStatus(), 100);
+    }
+    ms.put(new PathMetadata(basicFileStatus(strToPath(filePath), 9999, false)));
+    meta = ms.get(strToPath(filePath));
+    if (!allowMissing() || meta != null) {
+      verifyFileStatus(meta.getFileStatus(), 9999);
+    }
+  }
+  @Test
+  public void testRootDirPutNew() throws Exception {
+    Path rootPath = strToPath("/");
+    ms.put(new PathMetadata(makeFileStatus("/file1", 100)));
+    DirListingMetadata dir = ms.listChildren(rootPath);
+    if (!allowMissing() || dir != null) {
+      assertNotNull("Root dir cached", dir);
+      assertFalse("Root not fully cached", dir.isAuthoritative());
+      assertNotNull("have root dir file listing", dir.getListing());
+      assertEquals("One file in root dir", 1, dir.getListing().size());
+      assertEquals("file1 in root dir", strToPath("/file1"),
+          dir.getListing().iterator().next().getFileStatus().getPath());
+    }
+  }
+  @Test
+  public void testDelete() throws Exception {
+    setUpDeleteTest();
+    ms.delete(strToPath("/ADirectory1/db1/file2"));
+    /* Ensure delete happened. */
+    assertDirectorySize("/ADirectory1/db1", 1);
+    PathMetadata meta = ms.get(strToPath("/ADirectory1/db1/file2"));
+    assertTrue("File deleted", meta == null || meta.isDeleted());
+  }
+  @Test
+  public void testDeleteSubtree() throws Exception {
+    deleteSubtreeHelper("");
+  }
+  @Test
+  public void testDeleteSubtreeHostPath() throws Exception {
+    deleteSubtreeHelper(contract.getFileSystem().getUri().toString());
+  }
+  private void deleteSubtreeHelper(String pathPrefix) throws Exception {
+    String p = pathPrefix;
+    setUpDeleteTest(p);
+    createNewDirs(p + "/ADirectory1/db1/dc1", p + "/ADirectory1/db1/dc1/dd1");
+    ms.put(new PathMetadata(
+        makeFileStatus(p + "/ADirectory1/db1/dc1/dd1/deepFile", 100)));
+    if (!allowMissing()) {
+      assertCached(p + "/ADirectory1/db1");
+    }
+    ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"));
+    assertEmptyDirectory(p + "/ADirectory1");
+    assertDeleted(p + "/ADirectory1/db1");
+    assertDeleted(p + "/ADirectory1/file1");
+    assertDeleted(p + "/ADirectory1/file2");
+    assertDeleted(p + "/ADirectory1/db1/dc1/dd1/deepFile");
+    assertEmptyDirectory(p + "/ADirectory2");
+  }
+  /*
+   * Some implementations might not support this.  It was useful to test
+   * correctness of the LocalMetadataStore implementation, but feel free to
+   * override this to be a no-op.
+   */
+  @Test
+  public void testDeleteRecursiveRoot() throws Exception {
+    setUpDeleteTest();
+    ms.deleteSubtree(strToPath("/"));
+    assertDeleted("/ADirectory1");
+    assertDeleted("/ADirectory2");
+    assertDeleted("/ADirectory2/db1");
+    assertDeleted("/ADirectory2/db1/file1");
+    assertDeleted("/ADirectory2/db1/file2");
+  }
+  @Test
+  public void testDeleteNonExisting() throws Exception {
+    // Path doesn't exist, but should silently succeed
+    ms.delete(strToPath("/bobs/your/uncle"));
+    // Ditto.
+    ms.deleteSubtree(strToPath("/internets"));
+  }
+  private void setUpDeleteTest() throws IOException {
+    setUpDeleteTest("");
+  }
+  private void setUpDeleteTest(String prefix) throws IOException {
+    createNewDirs(prefix + "/ADirectory1", prefix + "/ADirectory2",
+        prefix + "/ADirectory1/db1");
+    ms.put(new PathMetadata(makeFileStatus(prefix + "/ADirectory1/db1/file1",
+        100)));
+    ms.put(new PathMetadata(makeFileStatus(prefix + "/ADirectory1/db1/file2",
+        100)));
+    PathMetadata meta = ms.get(strToPath(prefix + "/ADirectory1/db1/file2"));
+    if (!allowMissing() || meta != null) {
+      assertNotNull("Found test file", meta);
+      assertDirectorySize(prefix + "/ADirectory1/db1", 2);
+    }
+  }
+  @Test
+  public void testGet() throws Exception {
+    final String filePath = "/a1/b1/c1/some_file";
+    final String dirPath = "/a1/b1/c1/d1";
+    ms.put(new PathMetadata(makeFileStatus(filePath, 100)));
+    ms.put(new PathMetadata(makeDirStatus(dirPath)));
+    PathMetadata meta = ms.get(strToPath(filePath));
+    if (!allowMissing() || meta != null) {
+      assertNotNull("Get found file", meta);
+      verifyFileStatus(meta.getFileStatus(), 100);
+    }
+    if (!(ms instanceof NullMetadataStore)) {
+      ms.delete(strToPath(filePath));
+      meta = ms.get(strToPath(filePath));
+      assertTrue("Tombstone not left for deleted file", meta.isDeleted());
+    }
+    meta = ms.get(strToPath(dirPath));
+    if (!allowMissing() || meta != null) {
+      assertNotNull("Get found file (dir)", meta);
+      assertTrue("Found dir", meta.getFileStatus().isDirectory());
+    }
+    meta = ms.get(strToPath("/bollocks"));
+    assertNull("Don't get non-existent file", meta);
+  }
+  @Test
+  public void testGetEmptyDir() throws Exception {
+    final String dirPath = "/a1/b1/c1/d1";
+    // Creates /a1/b1/c1/d1 as an empty dir
+    setupListStatus();
+    // 1. Tell MetadataStore (MS) that there are zero children
+    putListStatusFiles(dirPath, true /* authoritative */
+        /* zero children */);
+    // 2. Request a file status for dir, including whether or not the dir
+    // is empty.
+    PathMetadata meta = ms.get(strToPath(dirPath), true);
+    // 3. Check that either (a) the MS doesn't track whether or not it is
+    // empty (which is allowed), or (b) the MS knows the dir is empty.
+    if (!allowMissing() || meta != null) {
+      assertNotNull("Get should find meta for dir", meta);
+      assertNotEquals("Dir is empty or unknown", Tristate.FALSE,
+          meta.isEmptyDirectory());
+    }
+  }
+  @Test
+  public void testGetNonEmptyDir() throws Exception {
+    final String dirPath = "/a1/b1/c1";
+    // Creates /a1/b1/c1 as an non-empty dir
+    setupListStatus();
+    // Request a file status for dir, including whether or not the dir
+    // is empty.
+    PathMetadata meta = ms.get(strToPath(dirPath), true);
+    // MetadataStore knows /a1/b1/c1 has at least one child.  It is valid
+    // for it to answer either (a) UNKNOWN: the MS doesn't track whether
+    // or not the dir is empty, or (b) the MS knows the dir is non-empty.
+    if (!allowMissing() || meta != null) {
+      assertNotNull("Get should find meta for dir", meta);
+      assertNotEquals("Dir is non-empty or unknown", Tristate.TRUE,
+          meta.isEmptyDirectory());
+    }
+  }
+  @Test
+  public void testGetDirUnknownIfEmpty() throws Exception {
+    final String dirPath = "/a1/b1/c1/d1";
+    // 1. Create /a1/b1/c1/d1 as an empty dir, but do not tell MetadataStore
+    // (MS) whether or not it has any children.
+    setupListStatus();
+    // 2. Request a file status for dir, including whether or not the dir
+    // is empty.
+    PathMetadata meta = ms.get(strToPath(dirPath), true);
+    // 3. Assert MS reports isEmptyDir as UNKONWN: We haven't told MS
+    // whether or not the directory has any children.
+    if (!allowMissing() || meta != null) {
+      assertNotNull("Get should find meta for dir", meta);
+      assertEquals("Dir empty is unknown", Tristate.UNKNOWN,
+          meta.isEmptyDirectory());
+    }
+  }
+  @Test
+  public void testListChildren() throws Exception {
+    setupListStatus();
+    DirListingMetadata dirMeta;
+    dirMeta = ms.listChildren(strToPath("/"));
+    if (!allowMissing()) {
+      assertNotNull(dirMeta);
+        /* Cache has no way of knowing it has all entries for root unless we
+         * specifically tell it via put() with
+         * DirListingMetadata.isAuthoritative = true */
+      assertFalse("Root dir is not cached, or partially cached",
+          dirMeta.isAuthoritative());
+      assertListingsEqual(dirMeta.getListing(), "/a1", "/a2");
+    }
+    dirMeta = ms.listChildren(strToPath("/a1"));
+    if (!allowMissing() || dirMeta != null) {
+      dirMeta = dirMeta.withoutTombstones();
+      assertListingsEqual(dirMeta.getListing(), "/a1/b1", "/a1/b2");
+    }
+    // TODO HADOOP-14756 instrument MetadataStore for asserting & testing
+    dirMeta = ms.listChildren(strToPath("/a1/b1"));
+    if (!allowMissing() || dirMeta != null) {
+      assertListingsEqual(dirMeta.getListing(), "/a1/b1/file1", "/a1/b1/file2",
+          "/a1/b1/c1");
+    }
+  }
+  @Test
+  public void testDirListingRoot() throws Exception {
+    commonTestPutListStatus("/");
+  }
+  @Test
+  public void testPutDirListing() throws Exception {
+    commonTestPutListStatus("/a");
+  }
+  @Test
+  public void testInvalidListChildren() throws Exception {
+    setupListStatus();
+    assertNull("missing path returns null",
+        ms.listChildren(strToPath("/a1/b1x")));
+  }
+  @Test
+  public void testMove() throws Exception {
+    // Create test dir structure
+    createNewDirs("/a1", "/a2", "/a3");
+    createNewDirs("/a1/b1", "/a1/b2");
+    putListStatusFiles("/a1/b1", false, "/a1/b1/file1", "/a1/b1/file2");
+    // Assert root listing as expected
+    Collection<PathMetadata> entries;
+    DirListingMetadata dirMeta = ms.listChildren(strToPath("/"));
+    if (!allowMissing() || dirMeta != null) {
+      dirMeta = dirMeta.withoutTombstones();
+      assertNotNull("Listing root", dirMeta);
+      entries = dirMeta.getListing();
+      assertListingsEqual(entries, "/a1", "/a2", "/a3");
+    }
+    // Assert src listing as expected
+    dirMeta = ms.listChildren(strToPath("/a1/b1"));
+    if (!allowMissing() || dirMeta != null) {
+      assertNotNull("Listing /a1/b1", dirMeta);
+      entries = dirMeta.getListing();
+      assertListingsEqual(entries, "/a1/b1/file1", "/a1/b1/file2");
+    }
+    // Do the move(): rename(/a1/b1, /b1)
+    Collection<Path> srcPaths = Arrays.asList(strToPath("/a1/b1"),
+        strToPath("/a1/b1/file1"), strToPath("/a1/b1/file2"));
+    ArrayList<PathMetadata> destMetas = new ArrayList<>();
+    destMetas.add(new PathMetadata(makeDirStatus("/b1")));
+    destMetas.add(new PathMetadata(makeFileStatus("/b1/file1", 100)));
+    destMetas.add(new PathMetadata(makeFileStatus("/b1/file2", 100)));
+    ms.move(srcPaths, destMetas);
+    // Assert src is no longer there
+    dirMeta = ms.listChildren(strToPath("/a1"));
+    if (!allowMissing() || dirMeta != null) {
+      assertNotNull("Listing /a1", dirMeta);
+      entries = dirMeta.withoutTombstones().getListing();
+      assertListingsEqual(entries, "/a1/b2");
+    }
+    PathMetadata meta = ms.get(strToPath("/a1/b1/file1"));
+    assertTrue("Src path deleted", meta == null || meta.isDeleted());
+    // Assert dest looks right
+    meta = ms.get(strToPath("/b1/file1"));
+    if (!allowMissing() || meta != null) {
+      assertNotNull("dest file not null", meta);
+      verifyFileStatus(meta.getFileStatus(), 100);
+    }
+    dirMeta = ms.listChildren(strToPath("/b1"));
+    if (!allowMissing() || dirMeta != null) {
+      assertNotNull("dest listing not null", dirMeta);
+      entries = dirMeta.getListing();
+      assertListingsEqual(entries, "/b1/file1", "/b1/file2");
+    }
+  }
+  /**
+   * Test that the MetadataStore differentiates between the same path in two
+   * different buckets.
+   */
+  @Test
+  public void testMultiBucketPaths() throws Exception {
+    String p1 = "s3a://bucket-a/path1";
+    String p2 = "s3a://bucket-b/path2";
+    // Make sure we start out empty
+    PathMetadata meta = ms.get(new Path(p1));
+    assertNull("Path should not be present yet.", meta);
+    meta = ms.get(new Path(p2));
+    assertNull("Path2 should not be present yet.", meta);
+    // Put p1, assert p2 doesn't match
+    ms.put(new PathMetadata(makeFileStatus(p1, 100)));
+    meta = ms.get(new Path(p2));
+    assertNull("Path 2 should not match path 1.", meta);
+    // Make sure delete is correct as well
+    if (!allowMissing()) {
+      ms.delete(new Path(p2));
+      meta = ms.get(new Path(p1));
+      assertNotNull("Path should not have been deleted", meta);
+    }
+    ms.delete(new Path(p1));
+  }
+  @Test
+  public void testPruneFiles() throws Exception {
+    Assume.assumeTrue(supportsPruning());
+    createNewDirs("/pruneFiles");
+    long oldTime = getTime();
+    ms.put(new PathMetadata(makeFileStatus("/pruneFiles/old", 1, oldTime,
+        oldTime)));
+    DirListingMetadata ls2 = ms.listChildren(strToPath("/pruneFiles"));
+    if (!allowMissing()) {
+      assertListingsEqual(ls2.getListing(), "/pruneFiles/old");
+    }
+    // It's possible for the Local implementation to get from /pruneFiles/old's
+    // modification time to here in under 1ms, causing it to not get pruned
+    Thread.sleep(1);
+    long cutoff = System.currentTimeMillis();
+    long newTime = getTime();
+    ms.put(new PathMetadata(makeFileStatus("/pruneFiles/new", 1, newTime,
+        newTime)));
+    DirListingMetadata ls;
+    ls = ms.listChildren(strToPath("/pruneFiles"));
+    if (!allowMissing()) {
+      assertListingsEqual(ls.getListing(), "/pruneFiles/new",
+          "/pruneFiles/old");
+    }
+    ms.prune(cutoff);
+    ls = ms.listChildren(strToPath("/pruneFiles"));
+    if (allowMissing()) {
+      assertDeleted("/pruneFiles/old");
+    } else {
+      assertListingsEqual(ls.getListing(), "/pruneFiles/new");
+    }
+  }
+  @Test
+  public void testPruneDirs() throws Exception {
+    Assume.assumeTrue(supportsPruning());
+    // We only test that files, not dirs, are removed during prune.
+    // We specifically allow directories to remain, as it is more robust
+    // for DynamoDBMetadataStore's prune() implementation: If a
+    // file was created in a directory while it was being pruned, it would
+    // violate the invariant that all ancestors of a file exist in the table.
+    createNewDirs("/pruneDirs/dir");
+    long oldTime = getTime();
+    ms.put(new PathMetadata(makeFileStatus("/pruneDirs/dir/file",
+        1, oldTime, oldTime)));
+    // It's possible for the Local implementation to get from the old
+    // modification time to here in under 1ms, causing it to not get pruned
+    Thread.sleep(1);
+    long cutoff = getTime();
+    ms.prune(cutoff);
+    assertDeleted("/pruneDirs/dir/file");
+  }
+  @Test
+  public void testPruneUnsetsAuthoritative() throws Exception {
+    String rootDir = "/unpruned-root-dir";
+    String grandparentDir = rootDir + "/pruned-grandparent-dir";
+    String parentDir = grandparentDir + "/pruned-parent-dir";
+    String staleFile = parentDir + "/stale-file";
+    String freshFile = rootDir + "/fresh-file";
+    String[] directories = {rootDir, grandparentDir, parentDir};
+    createNewDirs(rootDir, grandparentDir, parentDir);
+    long time = System.currentTimeMillis();
+    ms.put(new PathMetadata(
+        new FileStatus(0, false, 0, 0, time - 1, strToPath(staleFile)),
+        Tristate.FALSE, false));
+    ms.put(new PathMetadata(
+        new FileStatus(0, false, 0, 0, time + 1, strToPath(freshFile)),
+        Tristate.FALSE, false));
+    ms.prune(time);
+    DirListingMetadata listing;
+    for (String directory : directories) {
+      Path path = strToPath(directory);
+      if (ms.get(path) != null) {
+        listing = ms.listChildren(path);
+        assertFalse(listing.isAuthoritative());
+      }
+    }
+  }
+  /*
+   * Helper functions.
+   */
+  /** Modifies paths input array and returns it. */
+  private String[] buildPathStrings(String parent, String... paths)
+      throws IOException {
+    for (int i = 0; i < paths.length; i++) {
+      Path p = new Path(strToPath(parent), paths[i]);
+      paths[i] = p.toString();
+    }
+    return paths;
+  }
+  private void commonTestPutListStatus(final String parent) throws IOException {
+    putListStatusFiles(parent, true, buildPathStrings(parent, "file1", "file2",
+        "file3"));
+    DirListingMetadata dirMeta = ms.listChildren(strToPath(parent));
+    if (!allowMissing() || dirMeta != null) {
+      dirMeta = dirMeta.withoutTombstones();
+      assertNotNull("list after putListStatus", dirMeta);
+      Collection<PathMetadata> entries = dirMeta.getListing();
+      assertNotNull("listStatus has entries", entries);
+      assertListingsEqual(entries,
+          buildPathStrings(parent, "file1", "file2", "file3"));
+    }
+  }
+  private void setupListStatus() throws IOException {
+    createNewDirs("/a1", "/a2", "/a1/b1", "/a1/b2", "/a1/b1/c1",
+        "/a1/b1/c1/d1");
+    ms.put(new PathMetadata(makeFileStatus("/a1/b1/file1", 100)));
+    ms.put(new PathMetadata(makeFileStatus("/a1/b1/file2", 100)));
+  }
+  private void assertListingsEqual(Collection<PathMetadata> listing,
+      String ...pathStrs) throws IOException {
+    Set<Path> a = new HashSet<>();
+    for (PathMetadata meta : listing) {
+      a.add(meta.getFileStatus().getPath());
+    }
+    Set<Path> b = new HashSet<>();
+    for (String ps : pathStrs) {
+      b.add(strToPath(ps));
+    }
+    assertEquals("Same set of files", b, a);
+  }
+  private void putListStatusFiles(String dirPath, boolean authoritative,
+      String... filenames) throws IOException {
+    ArrayList<PathMetadata> metas = new ArrayList<>(filenames .length);
+    for (String filename : filenames) {
+      metas.add(new PathMetadata(makeFileStatus(filename, 100)));
+    }
+    DirListingMetadata dirMeta =
+        new DirListingMetadata(strToPath(dirPath), metas, authoritative);
+    ms.put(dirMeta);
+  }
+  private void createNewDirs(String... dirs)
+      throws IOException {
+    for (String pathStr : dirs) {
+      ms.put(new PathMetadata(makeDirStatus(pathStr)));
+    }
+  }
+  private void assertDirectorySize(String pathStr, int size)
+      throws IOException {
+    DirListingMetadata dirMeta = ms.listChildren(strToPath(pathStr));
+    if (!allowMissing()) {
+      assertNotNull("Directory " + pathStr + " in cache", dirMeta);
+    }
+    if (!allowMissing() || dirMeta != null) {
+      dirMeta = dirMeta.withoutTombstones();
+      assertEquals("Number of entries in dir " + pathStr, size,
+          nonDeleted(dirMeta.getListing()).size());
+    }
+  }
+  /** @return only file statuses which are *not* marked deleted. */
+  private Collection<PathMetadata> nonDeleted(
+      Collection<PathMetadata> statuses) {
+    Collection<PathMetadata> currentStatuses = new ArrayList<>();
+    for (PathMetadata status : statuses) {
+      if (!status.isDeleted()) {
+        currentStatuses.add(status);
+      }
+    }
+    return currentStatuses;
+  }
+  private void assertDeleted(String pathStr) throws IOException {
+    Path path = strToPath(pathStr);
+    PathMetadata meta = ms.get(path);
+    boolean cached = meta != null && !meta.isDeleted();
+    assertFalse(pathStr + " should not be cached.", cached);
+  }
+  protected void assertCached(String pathStr) throws IOException {
+    Path path = strToPath(pathStr);
+    PathMetadata meta = ms.get(path);
+    boolean cached = meta != null && !meta.isDeleted();
+    assertTrue(pathStr + " should be cached.", cached);
+  }
+  /**
+   * Convenience to create a fully qualified Path from string.
+   */
+  Path strToPath(String p) throws IOException {
+    final Path path = new Path(p);
+    assert path.isAbsolute();
+    return path.makeQualified(contract.getFileSystem().getUri(), null);
+  }
+  private void assertEmptyDirectory(String pathStr) throws IOException {
+    assertDirectorySize(pathStr, 0);
+  }
+  private void assertEmptyDirs(String ...dirs) throws IOException {
+    for (String pathStr : dirs) {
+      assertEmptyDirectory(pathStr);
+    }
+  }
+  FileStatus basicFileStatus(Path path, int size, boolean isDir) throws
+      IOException {
+    return basicFileStatus(path, size, isDir, modTime, accessTime);
+  }
+  FileStatus basicFileStatus(Path path, int size, boolean isDir,
+      long newModTime, long newAccessTime) throws IOException {
+    return new FileStatus(size, isDir, REPLICATION, BLOCK_SIZE, newModTime,
+        newAccessTime, PERMISSION, OWNER, GROUP, path);
+  }
+  private FileStatus makeFileStatus(String pathStr, int size) throws
+      IOException {
+    return makeFileStatus(pathStr, size, modTime, accessTime);
+  }
+  private FileStatus makeFileStatus(String pathStr, int size, long newModTime,
+      long newAccessTime) throws IOException {
+    return basicFileStatus(strToPath(pathStr), size, false,
+        newModTime, newAccessTime);
+  }
+  void verifyFileStatus(FileStatus status, long size) {
+    S3ATestUtils.verifyFileStatus(status, size, BLOCK_SIZE, modTime);
+  }
+  private FileStatus makeDirStatus(String pathStr) throws IOException {
+    return basicFileStatus(strToPath(pathStr), 0, true, modTime, accessTime);
+  }
+  /**
+   * Verify the directory file status. Subclass may verify additional fields.
+   */
+  void verifyDirStatus(FileStatus status) {
+    assertTrue("Is a dir", status.isDirectory());
+    assertEquals("zero length", 0, status.getLen());
+  }
+  long getModTime() {
+    return modTime;
+  }
+  long getAccessTime() {
+    return accessTime;
+  }
+  protected static long getTime() {
+    return System.currentTimeMillis();
+  }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
new file mode 100644
index 0000000..8458252
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
@@ -0,0 +1,303 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a.s3guard;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.*;
+ * Unit tests of {@link DirListingMetadata}.
+ */
+public class TestDirListingMetadata {
+  private static final String TEST_OWNER = "hadoop";
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+  @Test
+  public void testNullPath() {
+    exception.expect(NullPointerException.class);
+    exception.expectMessage(notNullValue(String.class));
+    new DirListingMetadata(null, null, false);
+  }
+  @Test
+  public void testNullListing() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    assertEquals(path, meta.getPath());
+    assertNotNull(meta.getListing());
+    assertTrue(meta.getListing().isEmpty());
+    assertFalse(meta.isAuthoritative());
+  }
+  @Test
+  public void testEmptyListing() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path,
+        new ArrayList<PathMetadata>(0),
+        false);
+    assertEquals(path, meta.getPath());
+    assertNotNull(meta.getListing());
+    assertTrue(meta.getListing().isEmpty());
+    assertFalse(meta.isAuthoritative());
+  }
+  @Test
+  public void testListing() {
+    Path path = new Path("/path");
+    PathMetadata pathMeta1 = new PathMetadata(
+        new S3AFileStatus(true, new Path(path, "dir1"), TEST_OWNER));
+    PathMetadata pathMeta2 = new PathMetadata(
+        new S3AFileStatus(true, new Path(path, "dir2"), TEST_OWNER));
+    PathMetadata pathMeta3 = new PathMetadata(
+        new S3AFileStatus(123, 456, new Path(path, "file1"), 8192, TEST_OWNER));
+    List<PathMetadata> listing = Arrays.asList(pathMeta1, pathMeta2, pathMeta3);
+    DirListingMetadata meta = new DirListingMetadata(path, listing, false);
+    assertEquals(path, meta.getPath());
+    assertNotNull(meta.getListing());
+    assertFalse(meta.getListing().isEmpty());
+    assertTrue(meta.getListing().contains(pathMeta1));
+    assertTrue(meta.getListing().contains(pathMeta2));
+    assertTrue(meta.getListing().contains(pathMeta3));
+    assertFalse(meta.isAuthoritative());
+  }
+  @Test
+  public void testListingUnmodifiable() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = makeTwoDirsOneFile(path);
+    assertNotNull(meta.getListing());
+    exception.expect(UnsupportedOperationException.class);
+    meta.getListing().clear();
+  }
+  @Test
+  public void testAuthoritative() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, true);
+    assertEquals(path, meta.getPath());
+    assertNotNull(meta.getListing());
+    assertTrue(meta.getListing().isEmpty());
+    assertTrue(meta.isAuthoritative());
+  }
+  @Test
+  public void testSetAuthoritative() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    assertEquals(path, meta.getPath());
+    assertNotNull(meta.getListing());
+    assertTrue(meta.getListing().isEmpty());
+    assertFalse(meta.isAuthoritative());
+    meta.setAuthoritative(true);
+    assertTrue(meta.isAuthoritative());
+  }
+  @Test
+  public void testGet() {
+    Path path = new Path("/path");
+    PathMetadata pathMeta1 = new PathMetadata(
+        new S3AFileStatus(true, new Path(path, "dir1"), TEST_OWNER));
+    PathMetadata pathMeta2 = new PathMetadata(
+        new S3AFileStatus(true, new Path(path, "dir2"), TEST_OWNER));
+    PathMetadata pathMeta3 = new PathMetadata(
+        new S3AFileStatus(123, 456, new Path(path, "file1"), 8192, TEST_OWNER));
+    List<PathMetadata> listing = Arrays.asList(pathMeta1, pathMeta2, pathMeta3);
+    DirListingMetadata meta = new DirListingMetadata(path, listing, false);
+    assertEquals(path, meta.getPath());
+    assertNotNull(meta.getListing());
+    assertFalse(meta.getListing().isEmpty());
+    assertTrue(meta.getListing().contains(pathMeta1));
+    assertTrue(meta.getListing().contains(pathMeta2));
+    assertTrue(meta.getListing().contains(pathMeta3));
+    assertFalse(meta.isAuthoritative());
+    assertEquals(pathMeta1, meta.get(pathMeta1.getFileStatus().getPath()));
+    assertEquals(pathMeta2, meta.get(pathMeta2.getFileStatus().getPath()));
+    assertEquals(pathMeta3, meta.get(pathMeta3.getFileStatus().getPath()));
+    assertNull(meta.get(new Path(path, "notfound")));
+  }
+  @Test
+  public void testGetNull() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    exception.expect(NullPointerException.class);
+    exception.expectMessage(notNullValue(String.class));
+    meta.get(null);
+  }
+  @Test
+  public void testGetRoot() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage(notNullValue(String.class));
+    meta.get(new Path("/"));
+  }
+  @Test
+  public void testGetNotChild() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage(notNullValue(String.class));
+    meta.get(new Path("/different/ancestor"));
+  }
+  @Test
+  public void testPut() {
+    Path path = new Path("/path");
+    PathMetadata pathMeta1 = new PathMetadata(
+        new S3AFileStatus(true, new Path(path, "dir1"), TEST_OWNER));
+    PathMetadata pathMeta2 = new PathMetadata(
+        new S3AFileStatus(true, new Path(path, "dir2"), TEST_OWNER));
+    PathMetadata pathMeta3 = new PathMetadata(
+        new S3AFileStatus(123, 456, new Path(path, "file1"), 8192, TEST_OWNER));
+    List<PathMetadata> listing = Arrays.asList(pathMeta1, pathMeta2, pathMeta3);
+    DirListingMetadata meta = new DirListingMetadata(path, listing, false);
+    assertEquals(path, meta.getPath());
+    assertNotNull(meta.getListing());
+    assertFalse(meta.getListing().isEmpty());
+    assertTrue(meta.getListing().contains(pathMeta1));
+    assertTrue(meta.getListing().contains(pathMeta2));
+    assertTrue(meta.getListing().contains(pathMeta3));
+    assertFalse(meta.isAuthoritative());
+    PathMetadata pathMeta4 = new PathMetadata(
+        new S3AFileStatus(true, new Path(path, "dir3"), TEST_OWNER));
+    meta.put(pathMeta4.getFileStatus());
+    assertTrue(meta.getListing().contains(pathMeta4));
+    assertEquals(pathMeta4, meta.get(pathMeta4.getFileStatus().getPath()));
+  }
+  @Test
+  public void testPutNull() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    exception.expect(NullPointerException.class);
+    exception.expectMessage(notNullValue(String.class));
+    meta.put(null);
+  }
+  @Test
+  public void testPutNullPath() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    exception.expect(NullPointerException.class);
+    exception.expectMessage(notNullValue(String.class));
+    meta.put(new S3AFileStatus(true, null, TEST_OWNER));
+  }
+  @Test
+  public void testPutRoot() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage(notNullValue(String.class));
+    meta.put(new S3AFileStatus(true, new Path("/"), TEST_OWNER));
+  }
+  @Test
+  public void testPutNotChild() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage(notNullValue(String.class));
+    meta.put(new S3AFileStatus(true, new Path("/different/ancestor"),
+        TEST_OWNER));
+  }
+  @Test
+  public void testRemove() {
+    Path path = new Path("/path");
+    PathMetadata pathMeta1 = new PathMetadata(
+        new S3AFileStatus(true, new Path(path, "dir1"), TEST_OWNER));
+    PathMetadata pathMeta2 = new PathMetadata(
+        new S3AFileStatus(true, new Path(path, "dir2"), TEST_OWNER));
+    PathMetadata pathMeta3 = new PathMetadata(
+        new S3AFileStatus(123, 456, new Path(path, "file1"), 8192, TEST_OWNER));
+    List<PathMetadata> listing = Arrays.asList(pathMeta1, pathMeta2, pathMeta3);
+    DirListingMetadata meta = new DirListingMetadata(path, listing, false);
+    assertEquals(path, meta.getPath());
+    assertNotNull(meta.getListing());
+    assertFalse(meta.getListing().isEmpty());
+    assertTrue(meta.getListing().contains(pathMeta1));
+    assertTrue(meta.getListing().contains(pathMeta2));
+    assertTrue(meta.getListing().contains(pathMeta3));
+    assertFalse(meta.isAuthoritative());
+    meta.remove(pathMeta1.getFileStatus().getPath());
+    assertFalse(meta.getListing().contains(pathMeta1));
+    assertNull(meta.get(pathMeta1.getFileStatus().getPath()));
+  }
+  @Test
+  public void testRemoveNull() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    exception.expect(NullPointerException.class);
+    exception.expectMessage(notNullValue(String.class));
+    meta.remove(null);
+  }
+  @Test
+  public void testRemoveRoot() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage(notNullValue(String.class));
+    meta.remove(new Path("/"));
+  }
+  @Test
+  public void testRemoveNotChild() {
+    Path path = new Path("/path");
+    DirListingMetadata meta = new DirListingMetadata(path, null, false);
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage(notNullValue(String.class));
+    meta.remove(new Path("/different/ancestor"));
+  }
+  /*
+   * Create DirListingMetadata with two dirs and one file living in directory
+   * 'parent'
+   */
+  private static DirListingMetadata makeTwoDirsOneFile(Path parent) {
+    PathMetadata pathMeta1 = new PathMetadata(
+        new S3AFileStatus(true, new Path(parent, "dir1"), TEST_OWNER));
+    PathMetadata pathMeta2 = new PathMetadata(
+        new S3AFileStatus(true, new Path(parent, "dir2"), TEST_OWNER));
+    PathMetadata pathMeta3 = new PathMetadata(
+        new S3AFileStatus(123, 456, new Path(parent, "file1"), 8192,
+            TEST_OWNER));
+    List<PathMetadata> listing = Arrays.asList(pathMeta1, pathMeta2, pathMeta3);
+    return new DirListingMetadata(parent, listing, false);
+  }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
new file mode 100644
index 0000000..02eb7b8
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
@@ -0,0 +1,594 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a.s3guard;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import com.amazonaws.AmazonServiceException;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.fs.s3a.Tristate;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.MockS3ClientFactory;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
+import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*;
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+ * Test that {@link DynamoDBMetadataStore} implements {@link MetadataStore}.
+ *
+ * In this unit test, we use an in-memory DynamoDBLocal server instead of real
+ * AWS DynamoDB. An {@link S3AFileSystem} object is created and shared for
+ * initializing {@link DynamoDBMetadataStore} objects.  There are no real S3
+ * request issued as the underlying AWS S3Client is mocked.  You won't be
+ * charged bills for AWS S3 or DynamoDB when you run this test.
+ *
+ * According to the base class, every test case will have independent contract
+ * to create a new {@link DynamoDBMetadataStore} instance and initializes it.
+ * A table will be created for each test by the test contract, and will be
+ * destroyed after the test case finishes.
+ */
+public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDynamoDBMetadataStore.class);
+  private static final String BUCKET = "TestDynamoDBMetadataStore";
+  private static final String S3URI =
+      URI.create(FS_S3A + "://" + BUCKET + "/").toString();
+  public static final PrimaryKey
+      VERSION_MARKER_PRIMARY_KEY = createVersionMarkerPrimaryKey(
+      DynamoDBMetadataStore.VERSION_MARKER);
+  /** The DynamoDB instance that can issue requests directly to server. */
+  private static DynamoDB dynamoDB;
+  @Rule
+  public final Timeout timeout = new Timeout(60 * 1000);
+  /**
+   * Start the in-memory DynamoDBLocal server and initializes s3 file system.
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    DynamoDBLocalClientFactory.startSingletonServer();
+    try {
+      dynamoDB = new DynamoDBMSContract().getMetadataStore().getDynamoDB();
+    } catch (AmazonServiceException e) {
+      final String msg = "Cannot initialize a DynamoDBMetadataStore instance "
+          + "against the local DynamoDB server. Perhaps the DynamoDBLocal "
+          + "server is not configured correctly. ";
+      LOG.error(msg, e);
+      // fail fast if the DynamoDBLocal server can not work
+      throw e;
+    }
+  }
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if (dynamoDB != null) {
+      dynamoDB.shutdown();
+    }
+    DynamoDBLocalClientFactory.stopSingletonServer();
+  }
+  /**
+   * Each contract has its own S3AFileSystem and DynamoDBMetadataStore objects.
+   */
+  private static class DynamoDBMSContract extends AbstractMSContract {
+    private final S3AFileSystem s3afs;
+    private final DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
+    DynamoDBMSContract() throws IOException {
+        this(new Configuration());
+    }
+    DynamoDBMSContract(Configuration conf) throws IOException {
+      // using mocked S3 clients
+      conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
+          S3ClientFactory.class);
+      conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, S3URI);
+      // setting config for creating a DynamoDBClient against local server
+      conf.set(ACCESS_KEY, "dummy-access-key");
+      conf.set(SECRET_KEY, "dummy-secret-key");
+      conf.setBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, true);
+      conf.setClass(S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL,
+          DynamoDBLocalClientFactory.class, DynamoDBClientFactory.class);
+      // always create new file system object for a test contract
+      s3afs = (S3AFileSystem) FileSystem.newInstance(conf);
+      ms.initialize(s3afs);
+    }
+    @Override
+    public S3AFileSystem getFileSystem() {
+      return s3afs;
+    }
+    @Override
+    public DynamoDBMetadataStore getMetadataStore() {
+      return ms;
+    }
+  }
+  @Override
+  public DynamoDBMSContract createContract() throws IOException {
+    return new DynamoDBMSContract();
+  }
+  @Override
+  public DynamoDBMSContract createContract(Configuration conf) throws
+      IOException {
+    return new DynamoDBMSContract(conf);
+  }
+  @Override
+  FileStatus basicFileStatus(Path path, int size, boolean isDir)
+      throws IOException {
+    String owner = UserGroupInformation.getCurrentUser().getShortUserName();
+    return isDir
+        ? new S3AFileStatus(true, path, owner)
+        : new S3AFileStatus(size, getModTime(), path, BLOCK_SIZE, owner);
+  }
+  private DynamoDBMetadataStore getDynamoMetadataStore() throws IOException {
+    return (DynamoDBMetadataStore) getContract().getMetadataStore();
+  }
+  private S3AFileSystem getFileSystem() throws IOException {
+    return (S3AFileSystem) getContract().getFileSystem();
+  }
+  /**
+   * This tests that after initialize() using an S3AFileSystem object, the
+   * instance should have been initialized successfully, and tables are ACTIVE.
+   */
+  @Test
+  public void testInitialize() throws IOException {
+    final String tableName = "testInitializeWithFileSystem";
+    final S3AFileSystem s3afs = getFileSystem();
+    final Configuration conf = s3afs.getConf();
+    conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+    try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+      ddbms.initialize(s3afs);
+      verifyTableInitialized(tableName);
+      assertNotNull(ddbms.getTable());
+      assertEquals(tableName, ddbms.getTable().getTableName());
+      String expectedRegion = conf.get(S3GUARD_DDB_REGION_KEY,
+          s3afs.getBucketLocation(tableName));
+      assertEquals("DynamoDB table should be in configured region or the same" +
+              " region as S3 bucket",
+          expectedRegion,
+          ddbms.getRegion());
+    }
+  }
+  /**
+   * This tests that after initialize() using a Configuration object, the
+   * instance should have been initialized successfully, and tables are ACTIVE.
+   */
+  @Test
+  public void testInitializeWithConfiguration() throws IOException {
+    final String tableName = "testInitializeWithConfiguration";
+    final Configuration conf = getFileSystem().getConf();
+    conf.unset(S3GUARD_DDB_TABLE_NAME_KEY);
+    String savedRegion = conf.get(S3GUARD_DDB_REGION_KEY,
+        getFileSystem().getBucketLocation());
+    conf.unset(S3GUARD_DDB_REGION_KEY);
+    try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+      ddbms.initialize(conf);
+      fail("Should have failed because the table name is not set!");
+    } catch (IllegalArgumentException ignored) {
+    }
+    // config table name
+    conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+    try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+      ddbms.initialize(conf);
+      fail("Should have failed because as the region is not set!");
+    } catch (IllegalArgumentException ignored) {
+    }
+    // config region
+    conf.set(S3GUARD_DDB_REGION_KEY, savedRegion);
+    try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+      ddbms.initialize(conf);
+      verifyTableInitialized(tableName);
+      assertNotNull(ddbms.getTable());
+      assertEquals(tableName, ddbms.getTable().getTableName());
+      assertEquals("Unexpected key schema found!",
+          keySchema(),
+          ddbms.getTable().describe().getKeySchema());
+    }
+  }
+  /**
+   * Test that for a large batch write request, the limit is handled correctly.
+   */
+  @Test
+  public void testBatchWrite() throws IOException {
+    final int[] numMetasToDeleteOrPut = {
+        -1, // null
+        0, // empty collection
+        1, // one path
+        S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT, // exact limit of a batch request
+        S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT + 1 // limit + 1
+    };
+    for (int numOldMetas : numMetasToDeleteOrPut) {
+      for (int numNewMetas : numMetasToDeleteOrPut) {
+        doTestBatchWrite(numOldMetas, numNewMetas);
+      }
+    }
+  }
+  private void doTestBatchWrite(int numDelete, int numPut) throws IOException {
+    final String root = S3URI + "/testBatchWrite_" + numDelete + '_' + numPut;
+    final Path oldDir = new Path(root, "oldDir");
+    final Path newDir = new Path(root, "newDir");
+"doTestBatchWrite: oldDir={}, newDir={}", oldDir, newDir);
+    DynamoDBMetadataStore ms = getDynamoMetadataStore();
+    ms.put(new PathMetadata(basicFileStatus(oldDir, 0, true)));
+    ms.put(new PathMetadata(basicFileStatus(newDir, 0, true)));
+    final List<PathMetadata> oldMetas =
+        numDelete < 0 ? null : new ArrayList<PathMetadata>(numDelete);
+    for (int i = 0; i < numDelete; i++) {
+      oldMetas.add(new PathMetadata(
+          basicFileStatus(new Path(oldDir, "child" + i), i, true)));
+    }
+    final List<PathMetadata> newMetas =
+        numPut < 0 ? null : new ArrayList<PathMetadata>(numPut);
+    for (int i = 0; i < numPut; i++) {
+      newMetas.add(new PathMetadata(
+          basicFileStatus(new Path(newDir, "child" + i), i, false)));
+    }
+    Collection<Path> pathsToDelete = null;
+    if (oldMetas != null) {
+      // put all metadata of old paths and verify
+      ms.put(new DirListingMetadata(oldDir, oldMetas, false));
+      assertEquals(0, ms.listChildren(newDir).withoutTombstones().numEntries());
+      assertTrue(CollectionUtils.isEqualCollection(oldMetas,
+          ms.listChildren(oldDir).getListing()));
+      pathsToDelete = new ArrayList<>(oldMetas.size());
+      for (PathMetadata meta : oldMetas) {
+        pathsToDelete.add(meta.getFileStatus().getPath());
+      }
+    }
+    // move the old paths to new paths and verify
+    ms.move(pathsToDelete, newMetas);
+    assertEquals(0, ms.listChildren(oldDir).withoutTombstones().numEntries());
+    if (newMetas != null) {
+      assertTrue(CollectionUtils.isEqualCollection(newMetas,
+          ms.listChildren(newDir).getListing()));
+    }
+  }
+  @Test
+  public void testInitExistingTable() throws IOException {
+    final DynamoDBMetadataStore ddbms = getDynamoMetadataStore();
+    final String tableName = ddbms.getTable().getTableName();
+    verifyTableInitialized(tableName);
+    // create existing table
+    ddbms.initTable();
+    verifyTableInitialized(tableName);
+  }
+  /**
+   * Test the low level version check code.
+   */
+  @Test
+  public void testItemVersionCompatibility() throws Throwable {
+    verifyVersionCompatibility("table",
+        createVersionMarker(VERSION_MARKER, VERSION, 0));
+  }
+  /**
+   * Test that a version marker entry without the version number field
+   * is rejected as incompatible with a meaningful error message.
+   */
+  @Test
+  public void testItemLacksVersion() throws Throwable {
+    intercept(IOException.class, E_NOT_VERSION_MARKER,
+        new VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            verifyVersionCompatibility("table",
+                new Item().withPrimaryKey(
+                    createVersionMarkerPrimaryKey(VERSION_MARKER)));
+          }
+        });
+  }
+  /**
+   * Delete the version marker and verify that table init fails.
+   */
+  @Test
+  public void testTableVersionRequired() throws Exception {
+    Configuration conf = getFileSystem().getConf();
+    int maxRetries = conf.getInt(S3GUARD_DDB_MAX_RETRIES,
+    conf.setInt(S3GUARD_DDB_MAX_RETRIES, 3);
+    final DynamoDBMetadataStore ddbms = createContract(conf).getMetadataStore();
+    String tableName = conf.get(S3GUARD_DDB_TABLE_NAME_KEY, BUCKET);
+    Table table = verifyTableInitialized(tableName);
+    table.deleteItem(VERSION_MARKER_PRIMARY_KEY);
+    // create existing table
+    intercept(IOException.class, E_NO_VERSION_MARKER,
+        new VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            ddbms.initTable();
+          }
+        });
+    conf.setInt(S3GUARD_DDB_MAX_RETRIES, maxRetries);
+  }
+  /**
+   * Set the version value to a different number and verify that
+   * table init fails.
+   */
+  @Test
+  public void testTableVersionMismatch() throws Exception {
+    final DynamoDBMetadataStore ddbms = createContract().getMetadataStore();
+    String tableName = getFileSystem().getConf()
+    Table table = verifyTableInitialized(tableName);
+    table.deleteItem(VERSION_MARKER_PRIMARY_KEY);
+    Item v200 = createVersionMarker(VERSION_MARKER, 200, 0);
+    table.putItem(v200);
+    // create existing table
+    intercept(IOException.class, E_INCOMPATIBLE_VERSION,
+        new VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            ddbms.initTable();
+          }
+        });
+  }
+  /**
+   * Test that initTable fails with IOException when table does not exist and
+   * table auto-creation is disabled.
+   */
+  @Test
+  public void testFailNonexistentTable() throws IOException {
+    final String tableName = "testFailNonexistentTable";
+    final S3AFileSystem s3afs = getFileSystem();
+    final Configuration conf = s3afs.getConf();
+    conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+    conf.unset(S3GUARD_DDB_TABLE_CREATE_KEY);
+    try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+      ddbms.initialize(s3afs);
+      fail("Should have failed as table does not exist and table auto-creation"
+          + " is disabled");
+    } catch (IOException ignored) {
+    }
+  }
+  /**
+   * Test cases about root directory as it is not in the DynamoDB table.
+   */
+  @Test
+  public void testRootDirectory() throws IOException {
+    final DynamoDBMetadataStore ddbms = getDynamoMetadataStore();
+    Path rootPath = new Path(S3URI);
+    verifyRootDirectory(ddbms.get(rootPath), true);
+    ddbms.put(new PathMetadata(new S3AFileStatus(true,
+        new Path(rootPath, "foo"),
+        UserGroupInformation.getCurrentUser().getShortUserName())));
+    verifyRootDirectory(ddbms.get(new Path(S3URI)), false);
+  }
+  private void verifyRootDirectory(PathMetadata rootMeta, boolean isEmpty) {
+    assertNotNull(rootMeta);
+    final FileStatus status = rootMeta.getFileStatus();
+    assertNotNull(status);
+    assertTrue(status.isDirectory());
+    // UNKNOWN is always a valid option, but true / false should not contradict
+    if (isEmpty) {
+      assertNotSame("Should not be marked non-empty",
+          Tristate.FALSE,
+          rootMeta.isEmptyDirectory());
+    } else {
+      assertNotSame("Should not be marked empty",
+          Tristate.TRUE,
+          rootMeta.isEmptyDirectory());
+    }
+  }
+  /**
+   * Test that when moving nested paths, all its ancestors up to destination
+   * root will also be created.
+   * Here is the directory tree before move:
+   * <pre>
+   * testMovePopulateAncestors
+   * ├── a
+   * │   └── b
+   * │       └── src
+   * │           ├── dir1
+   * │           │   └── dir2
+   * │           └── file1.txt
+   * └── c
+   *     └── d
+   *         └── dest
+   *</pre>
+   * As part of rename(a/b/src, d/c/dest), S3A will enumerate the subtree at
+   * a/b/src.  This test verifies that after the move, the new subtree at
+   * 'dest' is reachable from the root (i.e. c/ and c/d exist in the table.
+   * DynamoDBMetadataStore depends on this property to do recursive delete
+   * without a full table scan.
+   */
+  @Test
+  public void testMovePopulatesAncestors() throws IOException {
+    final DynamoDBMetadataStore ddbms = getDynamoMetadataStore();
+    final String testRoot = "/testMovePopulatesAncestors";
+    final String srcRoot = testRoot + "/a/b/src";
+    final String destRoot = testRoot + "/c/d/e/dest";
+    final Path nestedPath1 = strToPath(srcRoot + "/file1.txt");
+    ddbms.put(new PathMetadata(basicFileStatus(nestedPath1, 1024, false)));
+    final Path nestedPath2 = strToPath(srcRoot + "/dir1/dir2");
+    ddbms.put(new PathMetadata(basicFileStatus(nestedPath2, 0, true)));
+    // We don't put the destRoot path here, since put() would create ancestor
+    // entries, and we want to ensure that move() does it, instead.
+    // Build enumeration of src / dest paths and do the move()
+    final Collection<Path> fullSourcePaths = Lists.newArrayList(
+        strToPath(srcRoot),
+        strToPath(srcRoot + "/dir1"),
+        strToPath(srcRoot + "/dir1/dir2"),
+        strToPath(srcRoot + "/file1.txt")
+    );
+    final Collection<PathMetadata> pathsToCreate = Lists.newArrayList(
+        new PathMetadata(basicFileStatus(strToPath(destRoot),
+            0, true)),
+        new PathMetadata(basicFileStatus(strToPath(destRoot + "/dir1"),
+            0, true)),
+        new PathMetadata(basicFileStatus(strToPath(destRoot + "/dir1/dir2"),
+            0, true)),
+        new PathMetadata(basicFileStatus(strToPath(destRoot + "/file1.txt"),
+            1024, false))
+    );
+    ddbms.move(fullSourcePaths, pathsToCreate);
+    // assert that all the ancestors should have been populated automatically
+    assertCached(testRoot + "/c");
+    assertCached(testRoot + "/c/d");
+    assertCached(testRoot + "/c/d/e");
+    assertCached(destRoot /* /c/d/e/dest */);
+    // Also check moved files while we're at it
+    assertCached(destRoot + "/dir1");
+    assertCached(destRoot + "/dir1/dir2");
+    assertCached(destRoot + "/file1.txt");
+  }
+  @Test
+  public void testProvisionTable() throws IOException {
+    final DynamoDBMetadataStore ddbms = getDynamoMetadataStore();
+    final String tableName = ddbms.getTable().getTableName();
+    final ProvisionedThroughputDescription oldProvision =
+        dynamoDB.getTable(tableName).describe().getProvisionedThroughput();
+    ddbms.provisionTable(oldProvision.getReadCapacityUnits() * 2,
+        oldProvision.getWriteCapacityUnits() * 2);
+    final ProvisionedThroughputDescription newProvision =
+        dynamoDB.getTable(tableName).describe().getProvisionedThroughput();
+"Old provision = {}, new provision = {}",
+        oldProvision, newProvision);
+    assertEquals(oldProvision.getReadCapacityUnits() * 2,
+        newProvision.getReadCapacityUnits().longValue());
+    assertEquals(oldProvision.getWriteCapacityUnits() * 2,
+        newProvision.getWriteCapacityUnits().longValue());
+  }
+  @Test
+  public void testDeleteTable() throws IOException {
+    final String tableName = "testDeleteTable";
+    final S3AFileSystem s3afs = getFileSystem();
+    final Configuration conf = s3afs.getConf();
+    conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+    try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+      ddbms.initialize(s3afs);
+      // we can list the empty table
+      ddbms.listChildren(new Path(S3URI));
+      ddbms.destroy();
+      verifyTableNotExist(tableName);
+      // delete table once more; be ResourceNotFoundException swallowed silently
+      ddbms.destroy();
+      verifyTableNotExist(tableName);
+      try {
+        // we can no longer list the destroyed table
+        ddbms.listChildren(new Path(S3URI));
+        fail("Should have failed after the table is destroyed!");
+      } catch (IOException ignored) {
+      }
+    }
+  }
+  /**
+   * This validates the table is created and ACTIVE in DynamoDB.
+   *
+   * This should not rely on the {@link DynamoDBMetadataStore} implementation.
+   * Return the table
+   */
+  private static Table verifyTableInitialized(String tableName) {
+    final Table table = dynamoDB.getTable(tableName);
+    final TableDescription td = table.describe();
+    assertEquals(tableName, td.getTableName());
+    assertEquals("ACTIVE", td.getTableStatus());
+    return table;
+  }
+  /**
+   * This validates the table is not found in DynamoDB.
+   *
+   * This should not rely on the {@link DynamoDBMetadataStore} implementation.
+   */
+  private static void verifyTableNotExist(String tableName) {
+    final Table table = dynamoDB.getTable(tableName);
+    try {
+      table.describe();
+      fail("Expecting ResourceNotFoundException for table '" + tableName + "'");
+    } catch (ResourceNotFoundException ignored) {
+    }
+  }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
new file mode 100644
index 0000000..1b765af
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
@@ -0,0 +1,140 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a.s3guard;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+ * MetadataStore unit test for {@link LocalMetadataStore}.
+ */
+public class TestLocalMetadataStore extends MetadataStoreTestBase {
+  private static final String MAX_ENTRIES_STR = "16";
+  private final static class LocalMSContract extends AbstractMSContract {
+    private FileSystem fs;
+    private LocalMSContract() throws IOException {
+      this(new Configuration());
+    }
+    private LocalMSContract(Configuration config) throws IOException {
+      config.set(LocalMetadataStore.CONF_MAX_RECORDS, MAX_ENTRIES_STR);
+      fs = FileSystem.getLocal(config);
+    }
+    @Override
+    public FileSystem getFileSystem() {
+      return fs;
+    }
+    @Override
+    public MetadataStore getMetadataStore() throws IOException {
+      LocalMetadataStore lms = new LocalMetadataStore();
+      return lms;
+    }
+  }
+  @Override
+  public AbstractMSContract createContract() throws IOException {
+    return new LocalMSContract();
+  }
+  @Override
+  public AbstractMSContract createContract(Configuration conf) throws
+      IOException {
+    return new LocalMSContract(conf);
+  }
+  @Test
+  public void testClearByAncestor() {
+    Map<Path, PathMetadata> map = new HashMap<>();
+    // 1. Test paths without scheme/host
+    assertClearResult(map, "", "/", 0);
+    assertClearResult(map, "", "/dirA/dirB", 2);
+    assertClearResult(map, "", "/invalid", 5);
+    // 2. Test paths w/ scheme/host
+    String p = "s3a://fake-bucket-name";
+    assertClearResult(map, p, "/", 0);
+    assertClearResult(map, p, "/dirA/dirB", 2);
+    assertClearResult(map, p, "/invalid", 5);
+  }
+  private static void populateMap(Map<Path, PathMetadata> map,
+      String prefix) {
+    populateEntry(map, new Path(prefix + "/dirA/dirB/"));
+    populateEntry(map, new Path(prefix + "/dirA/dirB/dirC"));
+    populateEntry(map, new Path(prefix + "/dirA/dirB/dirC/file1"));
+    populateEntry(map, new Path(prefix + "/dirA/dirB/dirC/file2"));
+    populateEntry(map, new Path(prefix + "/dirA/file1"));
+  }
+  private static void populateEntry(Map<Path, PathMetadata> map,
+      Path path) {
+    map.put(path, new PathMetadata(new FileStatus(0, true, 0, 0, 0, path)));
+  }
+  private static int sizeOfMap(Map<Path, PathMetadata> map) {
+    int count = 0;
+    for (PathMetadata meta : map.values()) {
+      if (!meta.isDeleted()) {
+        count++;
+      }
+    }
+    return count;
+  }
+  private static void assertClearResult(Map <Path, PathMetadata> map,
+      String prefixStr, String pathStr, int leftoverSize) {
+    populateMap(map, prefixStr);
+    LocalMetadataStore.deleteHashByAncestor(new Path(prefixStr + pathStr), map,
+        true);
+    assertEquals(String.format("Map should have %d entries", leftoverSize),
+        leftoverSize, sizeOfMap(map));
+    map.clear();
+  }
+  @Override
+  protected void verifyFileStatus(FileStatus status, long size) {
+    S3ATestUtils.verifyFileStatus(status, size, REPLICATION, getModTime(),
+        getAccessTime(),
+  }
+  @Override
+  protected void verifyDirStatus(FileStatus status) {
+    S3ATestUtils.verifyDirStatus(status, REPLICATION, getModTime(),
+        getAccessTime(), OWNER, GROUP, PERMISSION);
+  }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
new file mode 100644
index 0000000..c0541ea
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
@@ -0,0 +1,58 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a.s3guard;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+ * Run MetadataStore unit tests on the NullMetadataStore implementation.
+ */
+public class TestNullMetadataStore extends MetadataStoreTestBase {
+  private static class NullMSContract extends AbstractMSContract {
+    @Override
+    public FileSystem getFileSystem() throws IOException {
+      Configuration config = new Configuration();
+      return FileSystem.getLocal(config);
+    }
+    @Override
+    public MetadataStore getMetadataStore() throws IOException {
+      return new NullMetadataStore();
+    }
+  }
+  /** This MetadataStore always says "I don't know, ask the backing store". */
+  @Override
+  public boolean allowMissing() {
+    return true;
+  }
+  @Override
+  public AbstractMSContract createContract() {
+    return new NullMSContract();
+  }
+  @Override
+  public AbstractMSContract createContract(Configuration conf) {
+    return createContract();
+  }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
new file mode 100644
index 0000000..1678746
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
@@ -0,0 +1,238 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a.s3guard;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.test.LambdaTestUtils;
+import static;
+import static;
+import static;
+import static org.hamcrest.CoreMatchers.anyOf;
+import static;
+import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
+import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER;
+import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION;
+ * Test the PathMetadataDynamoDBTranslation is able to translate between domain
+ * model objects and DynamoDB items.
+ */
+public class TestPathMetadataDynamoDBTranslation extends Assert {
+  private static final Path TEST_DIR_PATH = new Path("s3a://test-bucket/myDir");
+  private static final Item TEST_DIR_ITEM = new Item();
+  private static PathMetadata testDirPathMetadata;
+  private static final long TEST_FILE_LENGTH = 100;
+  private static final long TEST_MOD_TIME = 9999;
+  private static final long TEST_BLOCK_SIZE = 128;
+  private static final Path TEST_FILE_PATH = new Path(TEST_DIR_PATH, "myFile");
+  private static final Item TEST_FILE_ITEM = new Item();
+  private static PathMetadata testFilePathMetadata;
+  @BeforeClass
+  public static void setUpBeforeClass() throws IOException {
+    String username = UserGroupInformation.getCurrentUser().getShortUserName();
+    testDirPathMetadata =
+        new PathMetadata(new S3AFileStatus(false, TEST_DIR_PATH, username));
+        .withPrimaryKey(PARENT, "/test-bucket", CHILD, TEST_DIR_PATH.getName())
+        .withBoolean(IS_DIR, true);
+    testFilePathMetadata = new PathMetadata(
+            TEST_BLOCK_SIZE, username));
+        .withPrimaryKey(PARENT, pathToParentKey(TEST_FILE_PATH.getParent()),
+            CHILD, TEST_FILE_PATH.getName())
+        .withBoolean(IS_DIR, false)
+        .withLong(MOD_TIME, TEST_MOD_TIME)
+        .withLong(BLOCK_SIZE, TEST_BLOCK_SIZE);
+  }
+  /**
+   * It should not take long time as it doesn't involve remote server operation.
+   */
+  @Rule
+  public final Timeout timeout = new Timeout(30 * 1000);
+  @Test
+  public void testKeySchema() {
+    final Collection<KeySchemaElement> keySchema =
+        PathMetadataDynamoDBTranslation.keySchema();
+    assertNotNull(keySchema);
+    assertEquals("There should be HASH and RANGE key in key schema",
+        2, keySchema.size());
+    for (KeySchemaElement element : keySchema) {
+      assertThat(element.getAttributeName(), anyOf(is(PARENT), is(CHILD)));
+      assertThat(element.getKeyType(),
+          anyOf(is(HASH.toString()), is(RANGE.toString())));
+    }
+  }
+  @Test
+  public void testAttributeDefinitions() {
+    final Collection<AttributeDefinition> attrs =
+        PathMetadataDynamoDBTranslation.attributeDefinitions();
+    assertNotNull(attrs);
+    assertEquals("There should be HASH and RANGE attributes", 2, attrs.size());
+    for (AttributeDefinition definition : attrs) {
+      assertThat(definition.getAttributeName(), anyOf(is(PARENT), is(CHILD)));
+      assertEquals(S.toString(), definition.getAttributeType());
+    }
+  }
+  @Test
+  public void testItemToPathMetadata() throws IOException {
+    final String user =
+        UserGroupInformation.getCurrentUser().getShortUserName();
+    assertNull(itemToPathMetadata(null, user));
+    verify(TEST_DIR_ITEM, itemToPathMetadata(TEST_DIR_ITEM, user));
+    verify(TEST_FILE_ITEM, itemToPathMetadata(TEST_FILE_ITEM, user));
+  }
+  /**
+   * Verify that the Item and PathMetadata objects hold the same information.
+   */
+  private static void verify(Item item, PathMetadata meta) {
+    assertNotNull(meta);
+    final FileStatus status = meta.getFileStatus();
+    final Path path = status.getPath();
+    assertEquals(item.get(PARENT), pathToParentKey(path.getParent()));
+    assertEquals(item.get(CHILD), path.getName());
+    boolean isDir = item.hasAttribute(IS_DIR) && item.getBoolean(IS_DIR);
+    assertEquals(isDir, status.isDirectory());
+    long len = item.hasAttribute(FILE_LENGTH) ? item.getLong(FILE_LENGTH) : 0;
+    assertEquals(len, status.getLen());
+    long bSize = item.hasAttribute(BLOCK_SIZE) ? item.getLong(BLOCK_SIZE) : 0;
+    assertEquals(bSize, status.getBlockSize());
+    /*
+     * S3AFileStatue#getModificationTime() reports the current time, so the
+     * following assertion is failing.
+     *
+     * long modTime = item.hasAttribute(MOD_TIME) ? item.getLong(MOD_TIME) : 0;
+     * assertEquals(modTime, status.getModificationTime());
+     */
+  }
+  @Test
+  public void testPathMetadataToItem() {
+    verify(pathMetadataToItem(testDirPathMetadata), testDirPathMetadata);
+    verify(pathMetadataToItem(testFilePathMetadata),
+        testFilePathMetadata);
+  }
+  @Test
+  public void testPathToParentKeyAttribute() {
+    doTestPathToParentKeyAttribute(TEST_DIR_PATH);
+    doTestPathToParentKeyAttribute(TEST_FILE_PATH);
+  }
+  private static void doTestPathToParentKeyAttribute(Path path) {
+    final KeyAttribute attr = pathToParentKeyAttribute(path);
+    assertNotNull(attr);
+    assertEquals(PARENT, attr.getName());
+    // this path is expected as parent filed
+    assertEquals(pathToParentKey(path), attr.getValue());
+  }
+  private static String pathToParentKey(Path p) {
+    Preconditions.checkArgument(p.isUriPathAbsolute());
+    URI parentUri = p.toUri();
+    String bucket = parentUri.getHost();
+    Preconditions.checkNotNull(bucket);
+    String s =  "/" + bucket + parentUri.getPath();
+    // strip trailing slash
+    if (s.endsWith("/")) {
+      s = s.substring(0, s.length()-1);
+    }
+    return s;
+  }
+  @Test
+  public void testPathToKey() throws Exception {
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        new Callable<PrimaryKey>() {
+          @Override
+          public PrimaryKey call() throws Exception {
+            return pathToKey(new Path("/"));
+          }
+        });
+    doTestPathToKey(TEST_DIR_PATH);
+    doTestPathToKey(TEST_FILE_PATH);
+  }
+  private static void doTestPathToKey(Path path) {
+    final PrimaryKey key = pathToKey(path);
+    assertNotNull(key);
+    assertEquals("There should be both HASH and RANGE keys",
+        2, key.getComponents().size());
+    for (KeyAttribute keyAttribute : key.getComponents()) {
+      assertThat(keyAttribute.getName(), anyOf(is(PARENT), is(CHILD)));
+      if (PARENT.equals(keyAttribute.getName())) {
+        assertEquals(pathToParentKey(path.getParent()),
+            keyAttribute.getValue());
+      } else {
+        assertEquals(path.getName(), keyAttribute.getValue());
+      }
+    }
+  }
+  @Test
+  public void testVersionRoundTrip() throws Throwable {
+    final Item marker = createVersionMarker(VERSION_MARKER, VERSION, 0);
+    assertEquals("Extracted version from " + marker,
+        VERSION, extractVersionFromMarker(marker));
+  }
+  @Test
+  public void testVersionMarkerNotStatusIllegalPath() throws Throwable {
+    final Item marker = createVersionMarker(VERSION_MARKER, VERSION, 0);
+    assertNull("Path metadata fromfrom " + marker,
+        itemToPathMetadata(marker, "alice"));
+  }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
new file mode 100644
index 0000000..745e7aa
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/
@@ -0,0 +1,93 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a.s3guard;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+ * Tests for the {@link S3Guard} utility class.
+ */
+public class TestS3Guard extends Assert {
+  /**
+   * Basic test to ensure results from S3 and MetadataStore are merged
+   * correctly.
+   */
+  @Test
+  public void testDirListingUnion() throws Exception {
+    MetadataStore ms = new LocalMetadataStore();
+    Path dirPath = new Path("s3a://bucket/dir");
+    // Two files in metadata store listing
+    PathMetadata m1 = makePathMeta("s3a://bucket/dir/ms-file1", false);
+    PathMetadata m2 = makePathMeta("s3a://bucket/dir/ms-file2", false);
+    DirListingMetadata dirMeta = new DirListingMetadata(dirPath,
+        Arrays.asList(m1, m2), false);
+    // Two other files in s3
+    List<FileStatus> s3Listing = Arrays.asList(
+        makeFileStatus("s3a://bucket/dir/s3-file3", false),
+        makeFileStatus("s3a://bucket/dir/s3-file4", false)
+    );
+    FileStatus[] result = S3Guard.dirListingUnion(ms, dirPath, s3Listing,
+        dirMeta, false);
+    assertEquals("listing length", 4, result.length);
+    assertContainsPath(result, "s3a://bucket/dir/ms-file1");
+    assertContainsPath(result, "s3a://bucket/dir/ms-file2");
+    assertContainsPath(result, "s3a://bucket/dir/s3-file3");
+    assertContainsPath(result, "s3a://bucket/dir/s3-file4");
+  }
+  void assertContainsPath(FileStatus[] statuses, String pathStr) {
+    assertTrue("listing doesn't contain " + pathStr,
+        containsPath(statuses, pathStr));
+  }
+  boolean containsPath(FileStatus[] statuses, String pathStr) {
+    for (FileStatus s : statuses) {
+      if (s.getPath().toString().equals(pathStr)) {
+        return true;
+      }
+    }
+    return false;
+  }
+  private PathMetadata makePathMeta(String pathStr, boolean isDir) {
+    return new PathMetadata(makeFileStatus(pathStr, isDir));
+  }
+  private FileStatus makeFileStatus(String pathStr, boolean isDir) {
+    Path p = new Path(pathStr);
+    if (isDir) {
+      return new FileStatus(0, true, 1, 1, System.currentTimeMillis(), p);
+    } else {
+      return new FileStatus(100, false, 1, 1, System.currentTimeMillis(), p);
+    }
+  }

To unsubscribe, e-mail:
For additional commands, e-mail: