You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/02 21:20:55 UTC

[11/48] hadoop git commit: HADOOP-13345 HS3Guard: Improved Consistency for S3A. Contributed by: Chris Nauroth, Aaron Fabbri, Mingliang Liu, Lei (Eddy) Xu, Sean Mackrory, Steve Loughran and others.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
new file mode 100644
index 0000000..6cff533
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.AmazonS3;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
+
+/**
+ * Test S3Guard list consistency feature by injecting delayed listObjects()
+ * visibility via {@link InconsistentAmazonS3Client}.
+ *
+ * Tests here generally:
+ * 1. Use the inconsistency injection mentioned above.
+ * 2. Only run when S3Guard is enabled.
+ */
+public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
+        S3ClientFactory.class);
+    // Other configs would break test assumptions
+    conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING);
+    conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f);
+    conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, DEFAULT_DELAY_KEY_MSEC);
+    return new S3AContract(conf);
+  }
+
+  /**
+   * Helper function for other test cases: does a single rename operation and
+   * validates the aftermath.
+   * @param mkdirs Directories to create
+   * @param srcdirs Source paths for rename operation
+   * @param dstdirs Destination paths for rename operation
+   * @param yesdirs Files that must exist post-rename (e.g. srcdirs children)
+   * @param nodirs Files that must not exist post-rename (e.g. dstdirs children)
+   * @throws Exception
+   */
+  private void doTestRenameSequence(Path[] mkdirs, Path[] srcdirs,
+      Path[] dstdirs, Path[] yesdirs, Path[] nodirs) throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Assume.assumeTrue(fs.hasMetadataStore());
+
+    if (mkdirs != null) {
+      for (Path mkdir : mkdirs) {
+        assertTrue(fs.mkdirs(mkdir));
+      }
+      clearInconsistency(fs);
+    }
+
+    assertTrue("srcdirs and dstdirs must have equal length",
+        srcdirs.length == dstdirs.length);
+    for (int i = 0; i < srcdirs.length; i++) {
+      assertTrue("Rename returned false: " + srcdirs[i] + " -> " + dstdirs[i],
+          fs.rename(srcdirs[i], dstdirs[i]));
+    }
+
+    for (Path yesdir : yesdirs) {
+      assertTrue("Path was supposed to exist: " + yesdir, fs.exists(yesdir));
+    }
+    for (Path nodir : nodirs) {
+      assertFalse("Path is not supposed to exist: " + nodir, fs.exists(nodir));
+    }
+  }
+
+  /**
+   * Tests that after renaming a directory, the original directory and its
+   * contents are indeed missing and the corresponding new paths are visible.
+   * @throws Exception
+   */
+  @Test
+  public void testConsistentListAfterRename() throws Exception {
+    Path[] mkdirs = {
+      path("d1/f"),
+      path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING)
+    };
+    Path[] srcdirs = {path("d1")};
+    Path[] dstdirs = {path("d2")};
+    Path[] yesdirs = {path("d2"), path("d2/f"),
+        path("d2/f" + DEFAULT_DELAY_KEY_SUBSTRING)};
+    Path[] nodirs = {path("d1"), path("d1/f"),
+        path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING)};
+    doTestRenameSequence(mkdirs, srcdirs, dstdirs, yesdirs, nodirs);
+    getFileSystem().delete(path("d1"), true);
+    getFileSystem().delete(path("d2"), true);
+  }
+
+  /**
+   * Tests a circular sequence of renames to verify that overwriting recently
+   * deleted files and reading recently created files from rename operations
+   * works as expected.
+   * @throws Exception
+   */
+  @Test
+  public void testRollingRenames() throws Exception {
+    Path[] dir0 = {path("rolling/1")};
+    Path[] dir1 = {path("rolling/2")};
+    Path[] dir2 = {path("rolling/3")};
+    // These sets have to be in reverse order compared to the movement
+    Path[] setA = {dir1[0], dir0[0]};
+    Path[] setB = {dir2[0], dir1[0]};
+    Path[] setC = {dir0[0], dir2[0]};
+
+    for(int i = 0; i < 2; i++) {
+      Path[] firstSet = i == 0 ? setA : null;
+      doTestRenameSequence(firstSet, setA, setB, setB, dir0);
+      doTestRenameSequence(null, setB, setC, setC, dir1);
+      doTestRenameSequence(null, setC, setA, setA, dir2);
+    }
+
+    S3AFileSystem fs = getFileSystem();
+    assertFalse("Renaming deleted file should have failed",
+        fs.rename(dir2[0], dir1[0]));
+    assertTrue("Renaming over existing file should have succeeded",
+        fs.rename(dir1[0], dir0[0]));
+  }
+
+  /**
+   * Tests that deleted files immediately stop manifesting in list operations
+   * even when the effect in S3 is delayed.
+   * @throws Exception
+   */
+  @Test
+  public void testConsistentListAfterDelete() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    // test will fail if NullMetadataStore (the default) is configured: skip it.
+    Assume.assumeTrue(fs.hasMetadataStore());
+
+    // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+    // in listObjects() results via InconsistentS3Client
+    Path inconsistentPath =
+        path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
+
+    Path[] testDirs = {path("a/b/dir1"),
+        path("a/b/dir2"),
+        inconsistentPath};
+
+    for (Path path : testDirs) {
+      assertTrue(fs.mkdirs(path));
+    }
+    clearInconsistency(fs);
+    for (Path path : testDirs) {
+      assertTrue(fs.delete(path, false));
+    }
+
+    FileStatus[] paths = fs.listStatus(path("a/b/"));
+    List<Path> list = new ArrayList<>();
+    for (FileStatus fileState : paths) {
+      list.add(fileState.getPath());
+    }
+    assertFalse(list.contains(path("a/b/dir1")));
+    assertFalse(list.contains(path("a/b/dir2")));
+    // This should fail without S3Guard, and succeed with it.
+    assertFalse(list.contains(inconsistentPath));
+  }
+
+  /**
+   * Tests that rename immediately after files in the source directory are
+   * deleted results in exactly the correct set of destination files and none
+   * of the source files.
+   * @throws Exception
+   */
+  @Test
+  public void testConsistentRenameAfterDelete() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    // test will fail if NullMetadataStore (the default) is configured: skip it.
+    Assume.assumeTrue(fs.hasMetadataStore());
+
+    // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+    // in listObjects() results via InconsistentS3Client
+    Path inconsistentPath =
+        path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
+
+    Path[] testDirs = {path("a/b/dir1"),
+        path("a/b/dir2"),
+        inconsistentPath};
+
+    for (Path path : testDirs) {
+      assertTrue(fs.mkdirs(path));
+    }
+    clearInconsistency(fs);
+    assertTrue(fs.delete(testDirs[1], false));
+    assertTrue(fs.delete(testDirs[2], false));
+
+    fs.rename(path("a"), path("a3"));
+    FileStatus[] paths = fs.listStatus(path("a3/b"));
+    List<Path> list = new ArrayList<>();
+    for (FileStatus fileState : paths) {
+      list.add(fileState.getPath());
+    }
+    assertTrue(list.contains(path("a3/b/dir1")));
+    assertFalse(list.contains(path("a3/b/dir2")));
+    // This should fail without S3Guard, and succeed with it.
+    assertFalse(list.contains(path("a3/b/dir3-" +
+        DEFAULT_DELAY_KEY_SUBSTRING)));
+
+    try {
+      RemoteIterator<LocatedFileStatus> old = fs.listFilesAndEmptyDirectories(
+          path("a"), true);
+      fail("Recently renamed dir should not be visible");
+    } catch(FileNotFoundException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testConsistentListStatusAfterPut() throws Exception {
+
+    S3AFileSystem fs = getFileSystem();
+
+    // This test will fail if NullMetadataStore (the default) is configured:
+    // skip it.
+    Assume.assumeTrue(fs.hasMetadataStore());
+
+    // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+    // in listObjects() results via InconsistentS3Client
+    Path inconsistentPath =
+        path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
+
+    Path[] testDirs = {path("a/b/dir1"),
+        path("a/b/dir2"),
+        inconsistentPath};
+
+    for (Path path : testDirs) {
+      assertTrue(fs.mkdirs(path));
+    }
+
+    FileStatus[] paths = fs.listStatus(path("a/b/"));
+    List<Path> list = new ArrayList<>();
+    for (FileStatus fileState : paths) {
+      list.add(fileState.getPath());
+    }
+    assertTrue(list.contains(path("a/b/dir1")));
+    assertTrue(list.contains(path("a/b/dir2")));
+    // This should fail without S3Guard, and succeed with it.
+    assertTrue(list.contains(inconsistentPath));
+  }
+
+  /**
+   * Similar to {@link #testConsistentListStatusAfterPut()}, this tests that the
+   * FS listLocatedStatus() call will return consistent list.
+   */
+  @Test
+  public void testConsistentListLocatedStatusAfterPut() throws Exception {
+    final S3AFileSystem fs = getFileSystem();
+    // This test will fail if NullMetadataStore (the default) is configured:
+    // skip it.
+    Assume.assumeTrue(fs.hasMetadataStore());
+    String rootDir = "doTestConsistentListLocatedStatusAfterPut";
+    fs.mkdirs(path(rootDir));
+
+    final int[] numOfPaths = {0, 1, 5};
+    for (int normalPathNum : numOfPaths) {
+      for (int delayedPathNum : new int[] {0, 2}) {
+        LOG.info("Testing with normalPathNum={}, delayedPathNum={}",
+            normalPathNum, delayedPathNum);
+        doTestConsistentListLocatedStatusAfterPut(fs, rootDir, normalPathNum,
+            delayedPathNum);
+      }
+    }
+  }
+
+  /**
+   * Helper method to implement the tests of consistent listLocatedStatus().
+   * @param fs The S3 file system from contract
+   * @param normalPathNum number paths listed directly from S3 without delaying
+   * @param delayedPathNum number paths listed with delaying
+   * @throws Exception
+   */
+  private void doTestConsistentListLocatedStatusAfterPut(S3AFileSystem fs,
+      String rootDir, int normalPathNum, int delayedPathNum) throws Exception {
+    final List<Path> testDirs = new ArrayList<>(normalPathNum + delayedPathNum);
+    int index = 0;
+    for (; index < normalPathNum; index++) {
+      testDirs.add(path(rootDir + "/dir-" +
+          index));
+    }
+    for (; index < normalPathNum + delayedPathNum; index++) {
+      // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+      // in listObjects() results via InconsistentS3Client
+      testDirs.add(path(rootDir + "/dir-" + index +
+          DEFAULT_DELAY_KEY_SUBSTRING));
+    }
+
+    for (Path path : testDirs) {
+      // delete the old test path (if any) so that when we call mkdirs() later,
+      // the to delay directories will be tracked via putObject() request.
+      fs.delete(path, true);
+      assertTrue(fs.mkdirs(path));
+    }
+
+    // this should return the union data from S3 and MetadataStore
+    final RemoteIterator<LocatedFileStatus> statusIterator =
+        fs.listLocatedStatus(path(rootDir + "/"));
+    List<Path> list = new ArrayList<>();
+    for (; statusIterator.hasNext();) {
+      list.add(statusIterator.next().getPath());
+    }
+
+    // This should fail without S3Guard, and succeed with it because part of the
+    // children under test path are delaying visibility
+    for (Path path : testDirs) {
+      assertTrue("listLocatedStatus should list " + path, list.contains(path));
+    }
+  }
+
+  /**
+   * Tests that the S3AFS listFiles() call will return consistent file list.
+   */
+  @Test
+  public void testConsistentListFiles() throws Exception {
+    final S3AFileSystem fs = getFileSystem();
+    // This test will fail if NullMetadataStore (the default) is configured:
+    // skip it.
+    Assume.assumeTrue(fs.hasMetadataStore());
+
+    final int[] numOfPaths = {0, 2};
+    for (int dirNum : numOfPaths) {
+      for (int normalFile : numOfPaths) {
+        for (int delayedFile : new int[] {0, 1}) {
+          for (boolean recursive : new boolean[] {true, false}) {
+            doTestListFiles(fs, dirNum, normalFile, delayedFile, recursive);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper method to implement the tests of consistent listFiles().
+   *
+   * The file structure has dirNum subdirectories, and each directory (including
+   * the test base directory itself) has normalFileNum normal files and
+   * delayedFileNum delayed files.
+   *
+   * @param fs The S3 file system from contract
+   * @param dirNum number of subdirectories
+   * @param normalFileNum number files in each directory without delay to list
+   * @param delayedFileNum number files in each directory with delay to list
+   * @param recursive listFiles recursively if true
+   * @throws Exception if any unexpected error
+   */
+  private void doTestListFiles(S3AFileSystem fs, int dirNum, int normalFileNum,
+      int delayedFileNum, boolean recursive) throws Exception {
+    describe("Testing dirNum=%d, normalFile=%d, delayedFile=%d, "
+        + "recursive=%s", dirNum, normalFileNum, delayedFileNum, recursive);
+    final Path baseTestDir = path("doTestListFiles-" + dirNum + "-"
+        + normalFileNum + "-" + delayedFileNum + "-" + recursive);
+    // delete the old test path (if any) so that when we call mkdirs() later,
+    // the to delay sub directories will be tracked via putObject() request.
+    fs.delete(baseTestDir, true);
+
+    // make subdirectories (if any)
+    final List<Path> testDirs = new ArrayList<>(dirNum + 1);
+    assertTrue(fs.mkdirs(baseTestDir));
+    testDirs.add(baseTestDir);
+    for (int i = 0; i < dirNum; i++) {
+      final Path subdir = path(baseTestDir + "/dir-" + i);
+      assertTrue(fs.mkdirs(subdir));
+      testDirs.add(subdir);
+    }
+
+    final Collection<String> fileNames
+        = new ArrayList<>(normalFileNum + delayedFileNum);
+    int index = 0;
+    for (; index < normalFileNum; index++) {
+      fileNames.add("file-" + index);
+    }
+    for (; index < normalFileNum + delayedFileNum; index++) {
+      // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+      // in listObjects() results via InconsistentS3Client
+      fileNames.add("file-" + index + "-" + DEFAULT_DELAY_KEY_SUBSTRING);
+    }
+
+    int filesAndEmptyDirectories = 0;
+
+    // create files under each test directory
+    for (Path dir : testDirs) {
+      for (String fileName : fileNames) {
+        writeTextFile(fs, new Path(dir, fileName), "I, " + fileName, false);
+        filesAndEmptyDirectories++;
+      }
+    }
+
+    // this should return the union data from S3 and MetadataStore
+    final RemoteIterator<LocatedFileStatus> statusIterator
+        = fs.listFiles(baseTestDir, recursive);
+    final Collection<Path> listedFiles = new HashSet<>();
+    for (; statusIterator.hasNext();) {
+      final FileStatus status = statusIterator.next();
+      assertTrue("FileStatus " + status + " is not a file!", status.isFile());
+      listedFiles.add(status.getPath());
+    }
+    LOG.info("S3AFileSystem::listFiles('{}', {}) -> {}",
+        baseTestDir, recursive, listedFiles);
+
+    // This should fail without S3Guard, and succeed with it because part of the
+    // files to list are delaying visibility
+    if (!recursive) {
+      // in this case only the top level files are listed
+      assertEquals("Unexpected number of files returned by listFiles() call",
+          normalFileNum + delayedFileNum, listedFiles.size());
+      verifyFileIsListed(listedFiles, baseTestDir, fileNames);
+    } else {
+      assertEquals("Unexpected number of files returned by listFiles() call",
+          filesAndEmptyDirectories,
+          listedFiles.size());
+      for (Path dir : testDirs) {
+        verifyFileIsListed(listedFiles, dir, fileNames);
+      }
+    }
+  }
+
+  private static void verifyFileIsListed(Collection<Path> listedFiles,
+      Path currentDir, Collection<String> fileNames) {
+    for (String fileName : fileNames) {
+      final Path file = new Path(currentDir, fileName);
+      assertTrue(file + " should have been listed", listedFiles.contains(file));
+    }
+  }
+
+  @Test
+  public void testCommitByRenameOperations() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    Assume.assumeTrue(fs.hasMetadataStore());
+    Path work = path("test-commit-by-rename-" + DEFAULT_DELAY_KEY_SUBSTRING);
+    Path task00 = new Path(work, "task00");
+    fs.mkdirs(task00);
+    String name = "part-00";
+    try (FSDataOutputStream out =
+             fs.create(new Path(task00, name), false)) {
+      out.writeChars("hello");
+    }
+    for (FileStatus stat : fs.listStatus(task00)) {
+      fs.rename(stat.getPath(), work);
+    }
+    List<FileStatus> files = new ArrayList<>(2);
+    for (FileStatus stat : fs.listStatus(work)) {
+      if (stat.isFile()) {
+        files.add(stat);
+      }
+    }
+    assertFalse("renamed file " + name + " not found in " + work,
+        files.isEmpty());
+    assertEquals("more files found than expected in " + work
+        + " " + ls(work), 1, files.size());
+    FileStatus status = files.get(0);
+    assertEquals("Wrong filename in " + status,
+        name, status.getPath().getName());
+  }
+
+  @Test
+  public void testInconsistentS3ClientDeletes() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    Path root = path("testInconsistentClient" + DEFAULT_DELAY_KEY_SUBSTRING);
+    for (int i = 0; i < 3; i++) {
+      fs.mkdirs(new Path(root, "dir" + i));
+      touch(fs, new Path(root, "file" + i));
+      for (int j = 0; j < 3; j++) {
+        touch(fs, new Path(new Path(root, "dir" + i), "file" + i + "-" + j));
+      }
+    }
+    clearInconsistency(fs);
+
+    AmazonS3 client = fs.getAmazonS3Client();
+    String key = fs.pathToKey(root) + "/";
+
+    ObjectListing preDeleteDelimited = client.listObjects(
+        fs.createListObjectsRequest(key, "/"));
+    ObjectListing preDeleteUndelimited = client.listObjects(
+        fs.createListObjectsRequest(key, null));
+
+    fs.delete(root, true);
+
+    ObjectListing postDeleteDelimited = client.listObjects(
+        fs.createListObjectsRequest(key, "/"));
+    ObjectListing postDeleteUndelimited = client.listObjects(
+        fs.createListObjectsRequest(key, null));
+
+    assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
+            "in a non-recursive listing",
+        preDeleteDelimited.getObjectSummaries().size(),
+        postDeleteDelimited.getObjectSummaries().size()
+    );
+    assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
+            "in a non-recursive listing",
+        preDeleteDelimited.getCommonPrefixes().size(),
+        postDeleteDelimited.getCommonPrefixes().size()
+    );
+    assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
+            "in a recursive listing",
+        preDeleteUndelimited.getObjectSummaries().size(),
+        postDeleteUndelimited.getObjectSummaries().size()
+    );
+    assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
+            "in a recursive listing",
+        preDeleteUndelimited.getCommonPrefixes().size(),
+        postDeleteUndelimited.getCommonPrefixes().size()
+    );
+  }
+
+  private static void clearInconsistency(S3AFileSystem fs) throws Exception {
+    AmazonS3 s3 = fs.getAmazonS3Client();
+    InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
+    ic.clearInconsistency();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
new file mode 100644
index 0000000..a63b696
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+
+/**
+ * Test cases that validate S3Guard's behavior for writing things like
+ * directory listings back to the MetadataStore.
+ */
+public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
+
+  /**
+   * In listStatus(), when S3Guard is enabled, the full listing for a
+   * directory is "written back" to the MetadataStore before the listing is
+   * returned.  Currently this "write back" behavior occurs when
+   * fs.s3a.metadatastore.authoritative is true.  This test validates this
+   * behavior.
+   * @throws Exception on failure
+   */
+  @Test
+  public void testListStatusWriteBack() throws Exception {
+    Assume.assumeTrue(getFileSystem().hasMetadataStore());
+
+    Path directory = path("ListStatusWriteBack");
+
+    // "raw" S3AFileSystem without S3Guard
+    S3AFileSystem noS3Guard = createTestFS(directory.toUri(), true, false);
+
+    // Another with S3Guard and write-back disabled
+    S3AFileSystem noWriteBack = createTestFS(directory.toUri(), false, false);
+
+    // Another S3Guard and write-back enabled
+    S3AFileSystem yesWriteBack = createTestFS(directory.toUri(), false, true);
+
+    // delete the existing directory (in case of last test failure)
+    noS3Guard.delete(directory, true);
+    // Create a directory on S3 only
+    noS3Guard.mkdirs(new Path(directory, "OnS3"));
+    // Create a directory on both S3 and metadata store
+    Path p = new Path(directory, "OnS3AndMS");
+    assertPathDoesntExist(noWriteBack, p);
+    noWriteBack.mkdirs(p);
+
+    FileStatus[] fsResults;
+    DirListingMetadata mdResults;
+
+    // FS should return both even though S3Guard is not writing back to MS
+    fsResults = noWriteBack.listStatus(directory);
+    assertEquals("Filesystem enabled S3Guard without write back should have "
+            + "both /OnS3 and /OnS3AndMS: " + Arrays.toString(fsResults),
+        2, fsResults.length);
+
+    // Metadata store without write-back should still only contain /OnS3AndMS,
+    // because newly discovered /OnS3 is not written back to metadata store
+    mdResults = noWriteBack.getMetadataStore().listChildren(directory);
+    assertEquals("Metadata store without write back should still only know "
+            + "about /OnS3AndMS, but it has: " + mdResults,
+        1, mdResults.numEntries());
+
+    // FS should return both (and will write it back)
+    fsResults = yesWriteBack.listStatus(directory);
+    assertEquals("Filesystem enabled S3Guard with write back should have "
+            + " both /OnS3 and /OnS3AndMS: " + Arrays.toString(fsResults),
+        2, fsResults.length);
+
+    // Metadata store with write-back should contain both because the newly
+    // discovered /OnS3 should have been written back to metadata store
+    mdResults = yesWriteBack.getMetadataStore().listChildren(directory);
+    assertEquals("Unexpected number of results from metadata store. "
+            + "Should have /OnS3 and /OnS3AndMS: " + mdResults,
+        2, mdResults.numEntries());
+
+    // If we don't clean this up, the next test run will fail because it will
+    // have recorded /OnS3 being deleted even after it's written to noS3Guard.
+    getFileSystem().getMetadataStore().forgetMetadata(
+        new Path(directory, "OnS3"));
+  }
+
+  /** Create a separate S3AFileSystem instance for testing. */
+  private S3AFileSystem createTestFS(URI fsURI, boolean disableS3Guard,
+      boolean authoritativeMeta) throws IOException {
+    Configuration conf;
+
+    // Create a FileSystem that is S3-backed only
+    conf = createConfiguration();
+    S3ATestUtils.disableFilesystemCaching(conf);
+    if (disableS3Guard) {
+      conf.set(Constants.S3_METADATA_STORE_IMPL,
+          Constants.S3GUARD_METASTORE_NULL);
+    } else {
+      S3ATestUtils.maybeEnableS3Guard(conf);
+      conf.setBoolean(Constants.METADATASTORE_AUTHORITATIVE, authoritativeMeta);
+    }
+    FileSystem fs = FileSystem.get(fsURI, conf);
+    return asS3AFS(fs);
+  }
+
+  private static S3AFileSystem asS3AFS(FileSystem fs) {
+    assertTrue("Not a S3AFileSystem: " + fs, fs instanceof S3AFileSystem);
+    return (S3AFileSystem)fs;
+  }
+
+  private static void assertPathDoesntExist(FileSystem fs, Path p)
+      throws IOException {
+    try {
+      FileStatus s = fs.getFileStatus(p);
+    } catch (FileNotFoundException e) {
+      return;
+    }
+    fail("Path should not exist: " + p);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
index 9e0a5e4..4e25380 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.*;
 import java.net.URI;
 
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.Region;
 
 /**
  * An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3}
@@ -35,6 +36,8 @@ public class MockS3ClientFactory implements S3ClientFactory {
     String bucket = name.getHost();
     AmazonS3 s3 = mock(AmazonS3.class);
     when(s3.doesBucketExist(bucket)).thenReturn(true);
+    when(s3.getBucketLocation(anyString()))
+        .thenReturn(Region.US_West.toString());
     return s3;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index acbe610..2c4f009 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -135,6 +135,18 @@ public interface S3ATestConstants {
   String TEST_STS_ENDPOINT = "test.fs.s3a.sts.endpoint";
 
   /**
+   * Various S3Guard tests.
+   */
+  String TEST_S3GUARD_PREFIX = "fs.s3a.s3guard.test";
+  String TEST_S3GUARD_ENABLED = TEST_S3GUARD_PREFIX + ".enabled";
+  String TEST_S3GUARD_AUTHORITATIVE = TEST_S3GUARD_PREFIX + ".authoritative";
+  String TEST_S3GUARD_IMPLEMENTATION = TEST_S3GUARD_PREFIX + ".implementation";
+  String TEST_S3GUARD_IMPLEMENTATION_LOCAL = "local";
+  String TEST_S3GUARD_IMPLEMENTATION_DYNAMO = "dynamo";
+  String TEST_S3GUARD_IMPLEMENTATION_DYNAMODBLOCAL = "dynamodblocal";
+  String TEST_S3GUARD_IMPLEMENTATION_NONE = "none";
+
+  /**
    * Timeout in Milliseconds for standard tests: {@value}.
    */
   int S3A_TEST_TIMEOUT = 10 * 60 * 1000;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 9528967..8dbf90a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -22,7 +22,14 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3a.s3guard.DynamoDBClientFactory;
+import org.apache.hadoop.fs.s3a.s3guard.DynamoDBLocalClientFactory;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
+
+import org.hamcrest.core.Is;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.internal.AssumptionViolatedException;
@@ -31,11 +38,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
 import static org.junit.Assert.*;
 
 /**
@@ -52,6 +61,15 @@ public final class S3ATestUtils {
   public static final String UNSET_PROPERTY = "unset";
 
   /**
+   * Get S3A FS name.
+   * @param conf configuration.
+   * @return S3A fs name.
+   */
+  public static String getFsName(Configuration conf) {
+    return conf.getTrimmed(TEST_FS_S3A_NAME, "");
+  }
+
+  /**
    * Create the test filesystem.
    *
    * If the test.fs.s3a.name property is not set, this will
@@ -97,6 +115,8 @@ public final class S3ATestUtils {
       throw new AssumptionViolatedException(
           "No test filesystem in " + TEST_FS_S3A_NAME);
     }
+    // patch in S3Guard options
+    maybeEnableS3Guard(conf);
     S3AFileSystem fs1 = new S3AFileSystem();
     //enable purging in tests
     if (purge) {
@@ -137,6 +157,8 @@ public final class S3ATestUtils {
       throw new AssumptionViolatedException("No test filesystem in "
           + TEST_FS_S3A_NAME);
     }
+    // patch in S3Guard options
+    maybeEnableS3Guard(conf);
     FileContext fc = FileContext.getFileContext(testURI, conf);
     return fc;
   }
@@ -301,13 +323,96 @@ public final class S3ATestUtils {
    * @return a path
    */
   public static Path createTestPath(Path defVal) {
-    String testUniqueForkId = System.getProperty(
-        S3ATestConstants.TEST_UNIQUE_FORK_ID);
+    String testUniqueForkId =
+        System.getProperty(S3ATestConstants.TEST_UNIQUE_FORK_ID);
     return testUniqueForkId == null ? defVal :
         new Path("/" + testUniqueForkId, "test");
   }
 
   /**
+   * Test assumption that S3Guard is/is not enabled.
+   * @param shouldBeEnabled should S3Guard be enabled?
+   * @param originalConf configuration to check
+   * @throws URISyntaxException
+   */
+  public static void assumeS3GuardState(boolean shouldBeEnabled,
+      Configuration originalConf) throws URISyntaxException {
+    boolean isEnabled = getTestPropertyBool(originalConf, TEST_S3GUARD_ENABLED,
+        originalConf.getBoolean(TEST_S3GUARD_ENABLED, false));
+    Assume.assumeThat("Unexpected S3Guard test state:"
+            + " shouldBeEnabled=" + shouldBeEnabled
+            + " and isEnabled=" + isEnabled,
+        shouldBeEnabled, Is.is(isEnabled));
+
+    final String fsname = originalConf.getTrimmed(TEST_FS_S3A_NAME);
+    Assume.assumeNotNull(fsname);
+    final String bucket = new URI(fsname).getHost();
+    final Configuration conf = propagateBucketOptions(originalConf, bucket);
+    boolean usingNullImpl = S3GUARD_METASTORE_NULL.equals(
+        conf.getTrimmed(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_NULL));
+    Assume.assumeThat("Unexpected S3Guard test state:"
+            + " shouldBeEnabled=" + shouldBeEnabled
+            + " but usingNullImpl=" + usingNullImpl,
+        shouldBeEnabled, Is.is(!usingNullImpl));
+  }
+
+  /**
+   * Conditionally set the S3Guard options from test properties.
+   * @param conf configuration
+   */
+  public static void maybeEnableS3Guard(Configuration conf) {
+    if (getTestPropertyBool(conf, TEST_S3GUARD_ENABLED,
+        conf.getBoolean(TEST_S3GUARD_ENABLED, false))) {
+      // S3Guard is enabled.
+      boolean authoritative = getTestPropertyBool(conf,
+          TEST_S3GUARD_AUTHORITATIVE,
+          conf.getBoolean(TEST_S3GUARD_AUTHORITATIVE, true));
+      String impl = getTestProperty(conf, TEST_S3GUARD_IMPLEMENTATION,
+          conf.get(TEST_S3GUARD_IMPLEMENTATION,
+              TEST_S3GUARD_IMPLEMENTATION_LOCAL));
+      String implClass = "";
+      switch (impl) {
+      case TEST_S3GUARD_IMPLEMENTATION_LOCAL:
+        implClass = S3GUARD_METASTORE_LOCAL;
+        break;
+      case TEST_S3GUARD_IMPLEMENTATION_DYNAMODBLOCAL:
+        conf.setClass(S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL,
+            DynamoDBLocalClientFactory.class, DynamoDBClientFactory.class);
+      case TEST_S3GUARD_IMPLEMENTATION_DYNAMO:
+        implClass = S3GUARD_METASTORE_DYNAMO;
+        break;
+      case TEST_S3GUARD_IMPLEMENTATION_NONE:
+        implClass = S3GUARD_METASTORE_NULL;
+        break;
+      default:
+        fail("Unknown s3guard back end: \"" + impl + "\"");
+      }
+      LOG.debug("Enabling S3Guard, authoritative={}, implementation={}",
+          authoritative, implClass);
+      conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
+      conf.set(S3_METADATA_STORE_IMPL, implClass);
+      conf.setBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, true);
+    }
+  }
+
+  /**
+   * Is there a MetadataStore configured for s3a with authoritative enabled?
+   * @param conf Configuration to test.
+   * @return true iff there is a MetadataStore configured, and it is
+   * configured allow authoritative results.  This can result in reducing
+   * round trips to S3 service for cached results, which may affect FS/FC
+   * statistics.
+   */
+  public static boolean isMetadataStoreAuthoritative(Configuration conf) {
+    if (conf == null) {
+      return Constants.DEFAULT_METADATASTORE_AUTHORITATIVE;
+    }
+    return conf.getBoolean(
+        Constants.METADATASTORE_AUTHORITATIVE,
+        Constants.DEFAULT_METADATASTORE_AUTHORITATIVE);
+  }
+
+  /**
    * Reset all metrics in a list.
    * @param metrics metrics to reset
    */
@@ -504,6 +609,94 @@ public final class S3ATestUtils {
   }
 
   /**
+   * Verify the core size, block size and timestamp values of a file.
+   * @param status status entry to check
+   * @param size file size
+   * @param blockSize block size
+   * @param modTime modified time
+   */
+  public static void verifyFileStatus(FileStatus status, long size,
+      long blockSize, long modTime) {
+    verifyFileStatus(status, size, 0, modTime, 0, blockSize, null, null, null);
+  }
+
+  /**
+   * Verify the status entry of a file matches that expected.
+   * @param status status entry to check
+   * @param size file size
+   * @param replication replication factor (may be 0)
+   * @param modTime modified time
+   * @param accessTime access time (may be 0)
+   * @param blockSize block size
+   * @param owner owner (may be null)
+   * @param group user group (may be null)
+   * @param permission permission (may be null)
+   */
+  public static void verifyFileStatus(FileStatus status,
+      long size,
+      int replication,
+      long modTime,
+      long accessTime,
+      long blockSize,
+      String owner,
+      String group,
+      FsPermission permission) {
+    String details = status.toString();
+    assertFalse("Not a dir: " + details, status.isDirectory());
+    assertEquals("Mod time: " + details, modTime, status.getModificationTime());
+    assertEquals("File size: " + details, size, status.getLen());
+    assertEquals("Block size: " + details, blockSize, status.getBlockSize());
+    if (replication > 0) {
+      assertEquals("Replication value: " + details, replication,
+          status.getReplication());
+    }
+    if (accessTime != 0) {
+      assertEquals("Access time: " + details, accessTime,
+          status.getAccessTime());
+    }
+    if (owner != null) {
+      assertEquals("Owner: " + details, owner, status.getOwner());
+    }
+    if (group != null) {
+      assertEquals("Group: " + details, group, status.getGroup());
+    }
+    if (permission != null) {
+      assertEquals("Permission: " + details, permission,
+          status.getPermission());
+    }
+  }
+
+  /**
+   * Verify the status entry of a directory matches that expected.
+   * @param status status entry to check
+   * @param replication replication factor
+   * @param modTime modified time
+   * @param accessTime access time
+   * @param owner owner
+   * @param group user group
+   * @param permission permission.
+   */
+  public static void verifyDirStatus(FileStatus status,
+      int replication,
+      long modTime,
+      long accessTime,
+      String owner,
+      String group,
+      FsPermission permission) {
+    String details = status.toString();
+    assertTrue("Is a dir: " + details, status.isDirectory());
+    assertEquals("zero length: " + details, 0, status.getLen());
+
+    assertEquals("Mod time: " + details, modTime, status.getModificationTime());
+    assertEquals("Replication value: " + details, replication,
+        status.getReplication());
+    assertEquals("Access time: " + details, accessTime, status.getAccessTime());
+    assertEquals("Owner: " + details, owner, status.getOwner());
+    assertEquals("Group: " + details, group, status.getGroup());
+    assertEquals("Permission: " + details, permission, status.getPermission());
+  }
+
+  /**
    * Set a bucket specific property to a particular value.
    * If the generic key passed in has an {@code fs.s3a. prefix},
    * that's stripped off, so that when the the bucket properties are propagated

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
new file mode 100644
index 0000000..e647327
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
+import static org.apache.hadoop.fs.s3a.Listing.ProvidedFileStatusIterator;
+
+/**
+ * Place for the S3A listing classes; keeps all the small classes under control.
+ */
+public class TestListing extends AbstractS3AMockTest {
+
+  private static class MockRemoteIterator<FileStatus> implements
+      RemoteIterator<FileStatus> {
+    private Iterator<FileStatus> iterator;
+
+    MockRemoteIterator(Collection<FileStatus> source) {
+      iterator = source.iterator();
+    }
+
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+
+    public FileStatus next() {
+      return iterator.next();
+    }
+  }
+
+  private FileStatus blankFileStatus(Path path) {
+    return new FileStatus(0, true, 0, 0, 0, path);
+  }
+
+  @Test
+  public void testTombstoneReconcilingIterator() throws Exception {
+    Path parent = new Path("/parent");
+    Path liveChild = new Path(parent, "/liveChild");
+    Path deletedChild = new Path(parent, "/deletedChild");
+    Path[] allFiles = {parent, liveChild, deletedChild};
+    Path[] liveFiles = {parent, liveChild};
+
+    Listing listing = new Listing(fs);
+    Collection<FileStatus> statuses = new ArrayList<>();
+    statuses.add(blankFileStatus(parent));
+    statuses.add(blankFileStatus(liveChild));
+    statuses.add(blankFileStatus(deletedChild));
+
+    Set<Path> tombstones = new HashSet<>();
+    tombstones.add(deletedChild);
+
+    RemoteIterator<FileStatus> sourceIterator = new MockRemoteIterator(
+        statuses);
+    RemoteIterator<LocatedFileStatus> locatedIterator =
+        listing.createLocatedFileStatusIterator(sourceIterator);
+    RemoteIterator<LocatedFileStatus> reconcilingIterator =
+        listing.createTombstoneReconcilingIterator(locatedIterator, tombstones);
+
+    Set<Path> expectedPaths = new HashSet<>();
+    expectedPaths.add(parent);
+    expectedPaths.add(liveChild);
+
+    Set<Path> actualPaths = new HashSet<>();
+    while (reconcilingIterator.hasNext()) {
+      actualPaths.add(reconcilingIterator.next().getPath());
+    }
+    Assert.assertTrue(actualPaths.equals(expectedPaths));
+  }
+
+  @Test
+  public void testProvidedFileStatusIteratorEnd() throws Exception {
+    FileStatus[] statuses = {
+        new FileStatus(100, false, 1, 8192, 0, new Path("s3a://blah/blah"))
+    };
+    ProvidedFileStatusIterator it = new ProvidedFileStatusIterator(statuses,
+        ACCEPT_ALL, new Listing.AcceptAllButS3nDirs());
+
+    Assert.assertTrue("hasNext() should return true first time", it.hasNext());
+    Assert.assertNotNull("first element should not be null", it.next());
+    Assert.assertFalse("hasNext() should now be false", it.hasNext());
+    try {
+      it.next();
+      Assert.fail("next() should have thrown exception");
+    } catch (NoSuchElementException e) {
+      // Correct behavior.  Any other exceptions are propagated as failure.
+      return;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index e1aef75..e493818 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -39,7 +39,9 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
 
   @After
   public void tearDown() throws Exception {
-    fc.delete(fileContextTestHelper.getTestRootPath(fc, "test"), true);
+    if (fc != null) {
+      fc.delete(fileContextTestHelper.getTestRootPath(fc, "test"), true);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextURI.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextURI.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextURI.java
index fff1fcb..725646c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextURI.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextURI.java
@@ -16,19 +16,29 @@ package org.apache.hadoop.fs.s3a.fileContext;
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContextURIBase;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestFileSystem;
+
 /**
  * S3a implementation of FileContextURIBase.
  */
 public class ITestS3AFileContextURI extends FileContextURIBase {
 
+  private Configuration conf;
+  private boolean hasMetadataStore;
+
   @Before
   public void setUp() throws IOException, Exception {
-    Configuration conf = new Configuration();
+    conf = new Configuration();
+    try(S3AFileSystem s3aFS = createTestFileSystem(conf)) {
+      hasMetadataStore = s3aFS.hasMetadataStore();
+    }
     fc1 = S3ATestUtils.createTestFileContext(conf);
     fc2 = S3ATestUtils.createTestFileContext(conf); //different object, same FS
     super.setUp();
@@ -41,4 +51,11 @@ public class ITestS3AFileContextURI extends FileContextURIBase {
     // (the statistics tested with this method are not relevant for an S3FS)
   }
 
+  @Test
+  @Override
+  public void testModificationTime() throws IOException {
+    // skip modtime tests as there may be some inconsistency during creation
+    assume("modification time tests are skipped", !hasMetadataStore);
+    super.testModificationTime();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractMSContract.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractMSContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractMSContract.java
new file mode 100644
index 0000000..921d4a6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractMSContract.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/**
+ * Test specification for MetadataStore contract tests. Supplies configuration
+ * and MetadataStore instance.
+ */
+public abstract class AbstractMSContract {
+
+  public abstract FileSystem getFileSystem() throws IOException;
+  public abstract MetadataStore getMetadataStore() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
new file mode 100644
index 0000000..ceacdf3
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
+
+/**
+ * Common functionality for S3GuardTool test cases.
+ */
+public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
+
+  protected static final String OWNER = "hdfs";
+
+  private MetadataStore ms;
+
+  protected static void expectResult(int expected,
+      String message,
+      S3GuardTool tool,
+      String... args) throws Exception {
+    assertEquals(message, expected, tool.run(args));
+  }
+
+  protected static void expectSuccess(
+      String message,
+      S3GuardTool tool,
+      String... args) throws Exception {
+    assertEquals(message, SUCCESS, tool.run(args));
+  }
+
+  protected MetadataStore getMetadataStore() {
+    return ms;
+  }
+
+  protected abstract MetadataStore newMetadataStore();
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    S3ATestUtils.assumeS3GuardState(true, getConfiguration());
+    ms = newMetadataStore();
+    ms.initialize(getFileSystem());
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    IOUtils.cleanupWithLogger(LOG, ms);
+  }
+
+  protected void mkdirs(Path path, boolean onS3, boolean onMetadataStore)
+      throws IOException {
+    if (onS3) {
+      getFileSystem().mkdirs(path);
+    }
+    if (onMetadataStore) {
+      S3AFileStatus status = new S3AFileStatus(true, path, OWNER);
+      ms.put(new PathMetadata(status));
+    }
+  }
+
+  protected static void putFile(MetadataStore ms, S3AFileStatus f)
+      throws IOException {
+    assertNotNull(f);
+    ms.put(new PathMetadata(f));
+    Path parent = f.getPath().getParent();
+    while (parent != null) {
+      S3AFileStatus dir = new S3AFileStatus(false, parent, f.getOwner());
+      ms.put(new PathMetadata(dir));
+      parent = parent.getParent();
+    }
+  }
+
+  /**
+   * Create file either on S3 or in metadata store.
+   * @param path the file path.
+   * @param onS3 set to true to create the file on S3.
+   * @param onMetadataStore set to true to create the file on the
+   *                        metadata store.
+   * @throws IOException IO problem
+   */
+  protected void createFile(Path path, boolean onS3, boolean onMetadataStore)
+      throws IOException {
+    if (onS3) {
+      ContractTestUtils.touch(getFileSystem(), path);
+    }
+
+    if (onMetadataStore) {
+      S3AFileStatus status = new S3AFileStatus(100L, System.currentTimeMillis(),
+          getFileSystem().qualify(path), 512L, "hdfs");
+      putFile(ms, status);
+    }
+  }
+
+  private void testPruneCommand(Configuration cmdConf, String...args)
+      throws Exception {
+    Path parent = path("prune-cli");
+    try {
+      getFileSystem().mkdirs(parent);
+
+      S3GuardTool.Prune cmd = new S3GuardTool.Prune(cmdConf);
+      cmd.setMetadataStore(ms);
+
+      createFile(new Path(parent, "stale"), true, true);
+      Thread.sleep(TimeUnit.SECONDS.toMillis(2));
+      createFile(new Path(parent, "fresh"), true, true);
+
+      assertEquals(2, ms.listChildren(parent).getListing().size());
+      expectSuccess("Prune command did not exit successfully - see output", cmd,
+          args);
+      assertEquals(1, ms.listChildren(parent).getListing().size());
+    } finally {
+      getFileSystem().delete(parent, true);
+      ms.prune(Long.MAX_VALUE);
+    }
+  }
+
+  @Test
+  public void testPruneCommandCLI() throws Exception {
+    String testPath = path("testPruneCommandCLI").toString();
+    testPruneCommand(getFileSystem().getConf(),
+        "prune", "-seconds", "1", testPath);
+  }
+
+  @Test
+  public void testPruneCommandConf() throws Exception {
+    getConfiguration().setLong(Constants.S3GUARD_CLI_PRUNE_AGE,
+        TimeUnit.SECONDS.toMillis(1));
+    String testPath = path("testPruneCommandConf").toString();
+    testPruneCommand(getConfiguration(), "prune", testPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBLocalClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBLocalClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBLocalClientFactory.java
new file mode 100644
index 0000000..0291acd
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBLocalClientFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.File;
+import java.io.IOException;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.local.main.ServerRunner;
+import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
+import org.apache.hadoop.net.ServerSocketUtil;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
+import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBClientFactory.DefaultDynamoDBClientFactory.getRegion;
+
+/**
+ * A DynamoDBClientFactory implementation that creates AmazonDynamoDB clients
+ * against an in-memory DynamoDBLocal server instance.
+ *
+ * You won't be charged bills for issuing any DynamoDB requests. However, the
+ * DynamoDBLocal is considered a simulator of the DynamoDB web service, so it
+ * may be stale or different. For example, the throttling is not yet supported
+ * in DynamoDBLocal. This is for testing purpose only.
+ *
+ * To use this for creating DynamoDB client in tests:
+ * <ol>
+ * <li>
+ *    As all DynamoDBClientFactory implementations, this should be configured.
+ * </li>
+ * <li>
+ *    The singleton DynamoDBLocal server instance is started automatically when
+ *    creating the AmazonDynamoDB client for the first time. It still merits to
+ *    launch the server before all the tests and fail fast if error happens.
+ * </li>
+ * <li>
+ *    The server can be stopped explicitly, which is not actually needed in
+ *    tests as JVM termination will do that.
+ * </li>
+ * </ol>
+ *
+ * @see DefaultDynamoDBClientFactory
+ */
+public class DynamoDBLocalClientFactory extends Configured
+    implements DynamoDBClientFactory {
+
+  /** The DynamoDBLocal dynamoDBLocalServer instance for testing. */
+  private static DynamoDBProxyServer dynamoDBLocalServer;
+  private static String ddbEndpoint;
+
+  private static final String SYSPROP_SQLITE_LIB = "sqlite4java.library.path";
+
+  @Override
+  public AmazonDynamoDB createDynamoDBClient(String defaultRegion)
+      throws IOException {
+    startSingletonServer();
+
+    final Configuration conf = getConf();
+    final AWSCredentialsProvider credentials =
+        createAWSCredentialProviderSet(null, conf);
+    final ClientConfiguration awsConf =
+        DefaultS3ClientFactory.createAwsConf(conf);
+    // fail fast in case of service errors
+    awsConf.setMaxErrorRetry(3);
+
+    final String region = getRegion(conf, defaultRegion);
+    LOG.info("Creating DynamoDBLocal client using endpoint {} in region {}",
+        ddbEndpoint, region);
+
+    return AmazonDynamoDBClientBuilder.standard()
+        .withCredentials(credentials)
+        .withClientConfiguration(awsConf)
+        .withEndpointConfiguration(
+            new AwsClientBuilder.EndpointConfiguration(ddbEndpoint, region))
+        .build();
+  }
+
+  /**
+   * Start a singleton in-memory DynamoDBLocal server if not started yet.
+   * @throws IOException if any error occurs
+   */
+  public synchronized static void startSingletonServer() throws IOException {
+    if (dynamoDBLocalServer != null) {
+      return;
+    }
+
+    // Set this property if it has not been set elsewhere
+    if (StringUtils.isEmpty(System.getProperty(SYSPROP_SQLITE_LIB))) {
+      String projectBuildDir = System.getProperty("project.build.directory");
+      if (StringUtils.isEmpty(projectBuildDir)) {
+        projectBuildDir = "target";
+      }
+      // sqlite4java lib should have been copied to $projectBuildDir/native-libs
+      System.setProperty(SYSPROP_SQLITE_LIB,
+          projectBuildDir + File.separator + "native-libs");
+      LOG.info("Setting {} -> {}",
+          SYSPROP_SQLITE_LIB, System.getProperty(SYSPROP_SQLITE_LIB));
+    }
+
+    try {
+      // Start an in-memory local DynamoDB instance
+      final String port = String.valueOf(ServerSocketUtil.getPort(0, 100));
+      ddbEndpoint = "http://localhost:" + port;
+      dynamoDBLocalServer = ServerRunner.createServerFromCommandLineArgs(
+          new String[]{"-inMemory", "-port", port});
+      dynamoDBLocalServer.start();
+      LOG.info("DynamoDBLocal singleton server was started at {}", ddbEndpoint);
+    } catch (Exception t) {
+      String msg = "Error starting DynamoDBLocal server at " + ddbEndpoint
+          + " " + t;
+      LOG.error(msg, t);
+      throw new IOException(msg, t);
+    }
+  }
+
+  /**
+   * Stop the in-memory DynamoDBLocal server if it is started.
+   * @throws IOException if any error occurs
+   */
+  public synchronized static void stopSingletonServer() throws IOException {
+    if (dynamoDBLocalServer != null) {
+      LOG.info("Shutting down the in-memory DynamoDBLocal server");
+      try {
+        dynamoDBLocalServer.stop();
+      } catch (Throwable t) {
+        String msg = "Error stopping DynamoDBLocal server at " + ddbEndpoint;
+        LOG.error(msg, t);
+        throw new IOException(msg, t);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
new file mode 100644
index 0000000..c6838a0
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.Constants;
+
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
+
+/**
+ * Tests concurrent operations on S3Guard.
+ */
+public class ITestS3GuardConcurrentOps extends AbstractS3ATestBase {
+
+  @Rule
+  public final Timeout timeout = new Timeout(5 * 60 * 1000);
+
+  private void failIfTableExists(DynamoDB db, String tableName) {
+    boolean tableExists = true;
+    try {
+      Table table = db.getTable(tableName);
+      table.describe();
+    } catch (ResourceNotFoundException e) {
+      tableExists = false;
+    }
+    if (tableExists) {
+      fail("Table already exists: " + tableName);
+    }
+  }
+
+  private void deleteTable(DynamoDB db, String tableName) throws
+      InterruptedException {
+    try {
+      Table table = db.getTable(tableName);
+      table.waitForActive();
+      table.delete();
+      table.waitForDelete();
+    } catch (ResourceNotFoundException e) {
+      LOG.warn("Failed to delete {}, as it was not found", tableName, e);
+    }
+  }
+
+  @Test
+  public void testConcurrentTableCreations() throws Exception {
+    final Configuration conf = getConfiguration();
+    Assume.assumeTrue("Test only applies when DynamoDB is used for S3Guard",
+        conf.get(Constants.S3_METADATA_STORE_IMPL).equals(
+            Constants.S3GUARD_METASTORE_DYNAMO));
+
+    DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
+    ms.initialize(getFileSystem());
+    DynamoDB db = ms.getDynamoDB();
+
+    String tableName = "testConcurrentTableCreations" + new Random().nextInt();
+    conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true);
+    conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+
+    String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
+    if (StringUtils.isEmpty(region)) {
+      // no region set, so pick it up from the test bucket
+      conf.set(S3GUARD_DDB_REGION_KEY, getFileSystem().getBucketLocation());
+    }
+    int concurrentOps = 16;
+    int iterations = 4;
+
+    failIfTableExists(db, tableName);
+
+    for (int i = 0; i < iterations; i++) {
+      ExecutorService executor = Executors.newFixedThreadPool(
+          concurrentOps, new ThreadFactory() {
+            private AtomicInteger count = new AtomicInteger(0);
+
+            public Thread newThread(Runnable r) {
+              return new Thread(r,
+                  "testConcurrentTableCreations" + count.getAndIncrement());
+            }
+          });
+      ((ThreadPoolExecutor) executor).prestartAllCoreThreads();
+      Future<Exception>[] futures = new Future[concurrentOps];
+      for (int f = 0; f < concurrentOps; f++) {
+        final int index = f;
+        futures[f] = executor.submit(new Callable<Exception>() {
+          @Override
+          public Exception call() throws Exception {
+
+            ContractTestUtils.NanoTimer timer =
+                new ContractTestUtils.NanoTimer();
+
+            Exception result = null;
+            try (DynamoDBMetadataStore store = new DynamoDBMetadataStore()) {
+              store.initialize(conf);
+            } catch (Exception e) {
+              LOG.error(e.getClass() + ": " + e.getMessage());
+              result = e;
+            }
+
+            timer.end("Parallel DynamoDB client creation %d", index);
+            LOG.info("Parallel DynamoDB client creation {} ran from {} to {}",
+                index, timer.getStartTime(), timer.getEndTime());
+            return result;
+          }
+        });
+      }
+      List<Exception> exceptions = new ArrayList<>(concurrentOps);
+      for (int f = 0; f < concurrentOps; f++) {
+        Exception outcome = futures[f].get();
+        if (outcome != null) {
+          exceptions.add(outcome);
+        }
+      }
+      deleteTable(db, tableName);
+      int exceptionsThrown = exceptions.size();
+      if (exceptionsThrown > 0) {
+        // at least one exception was thrown. Fail the test & nest the first
+        // exception caught
+        throw new AssertionError(exceptionsThrown + "/" + concurrentOps +
+            " threads threw exceptions while initializing on iteration " + i,
+            exceptions.get(0));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
new file mode 100644
index 0000000..c13dfc4
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Destroy;
+import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Init;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+/**
+ * Test S3Guard related CLI commands against DynamoDB.
+ */
+public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
+
+  @Override
+  protected MetadataStore newMetadataStore() {
+    return new DynamoDBMetadataStore();
+  }
+
+  // Check the existence of a given DynamoDB table.
+  private static boolean exist(DynamoDB dynamoDB, String tableName) {
+    assertNotNull(dynamoDB);
+    assertNotNull(tableName);
+    assertFalse("empty table name", tableName.isEmpty());
+    try {
+      Table table = dynamoDB.getTable(tableName);
+      table.describe();
+    } catch (ResourceNotFoundException e) {
+      return false;
+    }
+    return true;
+  }
+
+  @Test
+  public void testInvalidRegion() throws Exception {
+    final String testTableName = "testInvalidRegion" + new Random().nextInt();
+    final String testRegion = "invalidRegion";
+    // Initialize MetadataStore
+    final Init initCmd = new Init(getFileSystem().getConf());
+    LambdaTestUtils.intercept(IOException.class,
+        new Callable<String>() {
+          @Override
+          public String call() throws Exception {
+            int res = initCmd.run(new String[]{
+                "init",
+                "-region", testRegion,
+                "-meta", "dynamodb://" + testTableName
+            });
+            return "Use of invalid region did not fail, returning " + res
+                + "- table may have been " +
+                "created and not cleaned up: " + testTableName;
+          }
+        });
+  }
+
+  @Test
+  public void testDynamoDBInitDestroyCycle() throws Exception {
+    String testTableName = "testDynamoDBInitDestroy" + new Random().nextInt();
+    String testS3Url = path(testTableName).toString();
+    S3AFileSystem fs = getFileSystem();
+    DynamoDB db = null;
+    try {
+      // Initialize MetadataStore
+      Init initCmd = new Init(fs.getConf());
+      expectSuccess("Init command did not exit successfully - see output",
+          initCmd,
+          "init", "-meta", "dynamodb://" + testTableName, testS3Url);
+      // Verify it exists
+      MetadataStore ms = getMetadataStore();
+      assertTrue("metadata store should be DynamoDBMetadataStore",
+          ms instanceof DynamoDBMetadataStore);
+      DynamoDBMetadataStore dynamoMs = (DynamoDBMetadataStore) ms;
+      db = dynamoMs.getDynamoDB();
+      assertTrue(String.format("%s does not exist", testTableName),
+          exist(db, testTableName));
+
+      // Destroy MetadataStore
+      Destroy destroyCmd = new Destroy(fs.getConf());
+
+      expectSuccess("Destroy command did not exit successfully - see output",
+          destroyCmd,
+          "destroy", "-meta", "dynamodb://" + testTableName, testS3Url);
+      // Verify it does not exist
+      assertFalse(String.format("%s still exists", testTableName),
+          exist(db, testTableName));
+
+      // delete again and expect success again
+      expectSuccess("Destroy command did not exit successfully - see output",
+          destroyCmd,
+          "destroy", "-meta", "dynamodb://" + testTableName, testS3Url);
+    } catch (ResourceNotFoundException e) {
+      throw new AssertionError(
+          String.format("DynamoDB table %s does not exist", testTableName),
+          e);
+    } finally {
+      LOG.warn("Table may have not been cleaned up: " +
+          testTableName);
+      if (db != null) {
+        Table table = db.getTable(testTableName);
+        if (table != null) {
+          try {
+            table.delete();
+            table.waitForDelete();
+          } catch (ResourceNotFoundException e) { /* Ignore */ }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java
new file mode 100644
index 0000000..181cdfb
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Diff;
+
+import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
+
+/**
+ * Test S3Guard related CLI commands against a LocalMetadataStore.
+ */
+public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
+
+  @Override
+  protected MetadataStore newMetadataStore() {
+    return new LocalMetadataStore();
+  }
+
+  @Test
+  public void testImportCommand() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    MetadataStore ms = getMetadataStore();
+    Path parent = path("test-import");
+    fs.mkdirs(parent);
+    Path dir = new Path(parent, "a");
+    fs.mkdirs(dir);
+    Path emptyDir = new Path(parent, "emptyDir");
+    fs.mkdirs(emptyDir);
+    for (int i = 0; i < 10; i++) {
+      String child = String.format("file-%d", i);
+      try (FSDataOutputStream out = fs.create(new Path(dir, child))) {
+        out.write(1);
+      }
+    }
+
+    S3GuardTool.Import cmd = new S3GuardTool.Import(fs.getConf());
+    cmd.setStore(ms);
+
+    expectSuccess("Import command did not exit successfully - see output",
+        cmd,
+        "import", parent.toString());
+
+    DirListingMetadata children =
+        ms.listChildren(dir);
+    assertEquals("Unexpected number of paths imported", 10, children
+        .getListing().size());
+    assertEquals("Expected 2 items: empty directory and a parent directory", 2,
+        ms.listChildren(parent).getListing().size());
+    // assertTrue(children.isAuthoritative());
+  }
+
+  @Test
+  public void testDiffCommand() throws IOException {
+    S3AFileSystem fs = getFileSystem();
+    MetadataStore ms = getMetadataStore();
+    Set<Path> filesOnS3 = new HashSet<>(); // files on S3.
+    Set<Path> filesOnMS = new HashSet<>(); // files on metadata store.
+
+    Path testPath = path("test-diff");
+    mkdirs(testPath, true, true);
+
+    Path msOnlyPath = new Path(testPath, "ms_only");
+    mkdirs(msOnlyPath, false, true);
+    filesOnMS.add(msOnlyPath);
+    for (int i = 0; i < 5; i++) {
+      Path file = new Path(msOnlyPath, String.format("file-%d", i));
+      createFile(file, false, true);
+      filesOnMS.add(file);
+    }
+
+    Path s3OnlyPath = new Path(testPath, "s3_only");
+    mkdirs(s3OnlyPath, true, false);
+    filesOnS3.add(s3OnlyPath);
+    for (int i = 0; i < 5; i++) {
+      Path file = new Path(s3OnlyPath, String.format("file-%d", i));
+      createFile(file, true, false);
+      filesOnS3.add(file);
+    }
+
+    ByteArrayOutputStream buf = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(buf);
+    Diff cmd = new Diff(fs.getConf());
+    cmd.setStore(ms);
+    assertEquals("Diff command did not exit successfully - see output", SUCCESS,
+        cmd.run(new String[]{"diff", "-meta", "local://metadata",
+            testPath.toString()}, out));
+    out.close();
+
+    Set<Path> actualOnS3 = new HashSet<>();
+    Set<Path> actualOnMS = new HashSet<>();
+    boolean duplicates = false;
+    try (BufferedReader reader =
+             new BufferedReader(new InputStreamReader(
+                 new ByteArrayInputStream(buf.toByteArray())))) {
+      String line;
+      while ((line = reader.readLine()) != null) {
+        String[] fields = line.split("\\s");
+        assertEquals("[" + line + "] does not have enough fields",
+            4, fields.length);
+        String where = fields[0];
+        Path path = new Path(fields[3]);
+        if (Diff.S3_PREFIX.equals(where)) {
+          duplicates = duplicates || actualOnS3.contains(path);
+          actualOnS3.add(path);
+        } else if (Diff.MS_PREFIX.equals(where)) {
+          duplicates = duplicates || actualOnMS.contains(path);
+          actualOnMS.add(path);
+        } else {
+          fail("Unknown prefix: " + where);
+        }
+      }
+    }
+    String actualOut = out.toString();
+    assertEquals("Mismatched metadata store outputs: " + actualOut,
+        filesOnMS, actualOnMS);
+    assertEquals("Mismatched s3 outputs: " + actualOut, filesOnS3, actualOnS3);
+    assertFalse("Diff contained duplicates", duplicates);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org