You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/29 10:29:45 UTC

[8/8] hadoop git commit: HADOOP-13345 S3Guard: Improved Consistency for S3A. Contributed by: Chris Nauroth, Aaron Fabbri, Mingliang Liu, Lei (Eddy) Xu, Sean Mackrory, Steve Loughran and others.

HADOOP-13345 S3Guard: Improved Consistency for S3A.
Contributed by: Chris Nauroth, Aaron Fabbri, Mingliang Liu, Lei (Eddy) Xu,
Sean Mackrory, Steve Loughran and others.

(cherry-picked from/based on commit 621b43e254afaff708cd6fc4698b29628f6abc33)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a1afc6aa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a1afc6aa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a1afc6aa

Branch: refs/heads/branch-2
Commit: a1afc6aa59188d32133bfa0367ff11219428d568
Parents: a641bce
Author: Steve Loughran <st...@apache.org>
Authored: Fri Sep 29 11:29:22 2017 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Fri Sep 29 11:29:22 2017 +0100

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/pom.xml     |    5 +
 .../hadoop-common/src/main/bin/hadoop           |    3 +
 .../apache/hadoop/fs/AbstractFileSystem.java    |    8 +
 .../java/org/apache/hadoop/fs/FileContext.java  |    9 +
 .../src/main/resources/core-default.xml         |  108 ++
 .../fs/contract/AbstractContractRenameTest.java |   63 ++
 .../org/apache/hadoop/test/LambdaTestUtils.java |  112 ++
 hadoop-project/pom.xml                          |   13 +
 .../hadoop-aws/dev-support/findbugs-exclude.xml |    6 +
 hadoop-tools/hadoop-aws/pom.xml                 |  147 +++
 .../org/apache/hadoop/fs/s3a/Constants.java     |  133 ++-
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java   |  233 ++++
 .../fs/s3a/InconsistentAmazonS3Client.java      |  434 ++++++++
 .../fs/s3a/InconsistentS3ClientFactory.java     |   40 +
 .../java/org/apache/hadoop/fs/s3a/Listing.java  |  263 ++++-
 .../hadoop/fs/s3a/S3ABlockOutputStream.java     |   17 +-
 .../org/apache/hadoop/fs/s3a/S3AFileStatus.java |   45 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  572 ++++++++--
 .../hadoop/fs/s3a/S3AInstrumentation.java       |   81 +-
 .../apache/hadoop/fs/s3a/S3AOutputStream.java   |   14 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java |   36 +-
 .../apache/hadoop/fs/s3a/S3ClientFactory.java   |  195 +---
 .../org/apache/hadoop/fs/s3a/Statistic.java     |   13 +-
 .../java/org/apache/hadoop/fs/s3a/Tristate.java |   32 +
 .../org/apache/hadoop/fs/s3a/UploadInfo.java    |   43 +
 .../fs/s3a/s3guard/DescendantsIterator.java     |  142 +++
 .../fs/s3a/s3guard/DirListingMetadata.java      |  322 ++++++
 .../fs/s3a/s3guard/DynamoDBClientFactory.java   |  132 +++
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java   | 1010 ++++++++++++++++++
 .../fs/s3a/s3guard/LocalMetadataStore.java      |  435 ++++++++
 .../hadoop/fs/s3a/s3guard/LruHashMap.java       |   50 +
 .../hadoop/fs/s3a/s3guard/MetadataStore.java    |  221 ++++
 .../s3guard/MetadataStoreListFilesIterator.java |  169 +++
 .../fs/s3a/s3guard/NullMetadataStore.java       |  104 ++
 .../hadoop/fs/s3a/s3guard/PathMetadata.java     |  143 +++
 .../PathMetadataDynamoDBTranslation.java        |  304 ++++++
 .../apache/hadoop/fs/s3a/s3guard/S3Guard.java   |  463 ++++++++
 .../hadoop/fs/s3a/s3guard/S3GuardTool.java      |  924 ++++++++++++++++
 .../hadoop/fs/s3a/s3guard/package-info.java     |   30 +
 .../hadoop/fs/s3native/S3xLoginHelper.java      |    4 +
 .../src/site/markdown/tools/hadoop-aws/index.md |    3 +-
 .../site/markdown/tools/hadoop-aws/s3guard.md   |  610 +++++++++++
 .../site/markdown/tools/hadoop-aws/testing.md   |  288 ++++-
 .../fs/contract/s3a/ITestS3AContractCreate.java |   14 +
 .../fs/contract/s3a/ITestS3AContractDelete.java |   14 +
 .../fs/contract/s3a/ITestS3AContractDistCp.java |    7 +
 .../s3a/ITestS3AContractGetFileStatus.java      |    4 +
 .../fs/contract/s3a/ITestS3AContractMkdir.java  |   14 +
 .../fs/contract/s3a/ITestS3AContractOpen.java   |   14 +
 .../fs/contract/s3a/ITestS3AContractRename.java |   13 +
 .../contract/s3a/ITestS3AContractRootDir.java   |   14 +
 .../fs/contract/s3a/ITestS3AContractSeek.java   |   14 +
 .../hadoop/fs/s3a/AbstractS3AMockTest.java      |    9 +-
 .../hadoop/fs/s3a/AbstractS3ATestBase.java      |   26 +-
 .../fs/s3a/ITestS3AAWSCredentialsProvider.java  |    4 +
 .../hadoop/fs/s3a/ITestS3AConfiguration.java    |    3 +-
 .../hadoop/fs/s3a/ITestS3ACredentialsInURL.java |   11 +-
 .../hadoop/fs/s3a/ITestS3ADelayedFNF.java       |   62 ++
 .../hadoop/fs/s3a/ITestS3AEmptyDirectory.java   |   83 ++
 .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java   |  312 +++---
 .../fs/s3a/ITestS3AFileOperationCost.java       |   40 +-
 .../fs/s3a/ITestS3AFileSystemContract.java      |    3 +
 .../hadoop/fs/s3a/ITestS3AInconsistency.java    |  100 ++
 .../hadoop/fs/s3a/ITestS3AMiscOperations.java   |   27 +
 .../hadoop/fs/s3a/ITestS3GuardCreate.java       |   61 ++
 .../hadoop/fs/s3a/ITestS3GuardEmptyDirs.java    |   85 ++
 .../fs/s3a/ITestS3GuardListConsistency.java     |  544 ++++++++++
 .../hadoop/fs/s3a/ITestS3GuardWriteBack.java    |  141 +++
 .../hadoop/fs/s3a/MockS3ClientFactory.java      |    3 +
 .../apache/hadoop/fs/s3a/S3ATestConstants.java  |   12 +
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java  |  197 +++-
 .../org/apache/hadoop/fs/s3a/TestListing.java   |  118 ++
 .../ITestS3AFileContextStatistics.java          |    4 +-
 .../s3a/fileContext/ITestS3AFileContextURI.java |   19 +-
 .../fs/s3a/s3guard/AbstractMSContract.java      |   33 +
 .../s3guard/AbstractS3GuardToolTestBase.java    |  161 +++
 .../s3a/s3guard/DynamoDBLocalClientFactory.java |  157 +++
 .../s3a/s3guard/ITestS3GuardConcurrentOps.java  |  160 +++
 .../s3a/s3guard/ITestS3GuardToolDynamoDB.java   |  134 +++
 .../fs/s3a/s3guard/ITestS3GuardToolLocal.java   |  149 +++
 .../fs/s3a/s3guard/MetadataStoreTestBase.java   |  887 +++++++++++++++
 .../fs/s3a/s3guard/TestDirListingMetadata.java  |  303 ++++++
 .../s3a/s3guard/TestDynamoDBMetadataStore.java  |  594 ++++++++++
 .../fs/s3a/s3guard/TestLocalMetadataStore.java  |  140 +++
 .../fs/s3a/s3guard/TestNullMetadataStore.java   |   58 +
 .../TestPathMetadataDynamoDBTranslation.java    |  238 +++++
 .../hadoop/fs/s3a/s3guard/TestS3Guard.java      |   93 ++
 .../AbstractITestS3AMetadataStoreScale.java     |  250 +++++
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java |   13 +-
 .../scale/ITestDynamoDBMetadataStoreScale.java  |   48 +
 .../s3a/scale/ITestLocalMetadataStoreScale.java |   37 +
 .../fs/s3a/scale/ITestS3AConcurrentOps.java     |    3 +-
 .../fs/s3a/scale/ITestS3ACreatePerformance.java |   86 ++
 .../s3a/scale/ITestS3ADirectoryPerformance.java |    5 +-
 .../scale/ITestS3AInputStreamPerformance.java   |    4 +-
 .../hadoop/fs/s3a/scale/S3AScaleTestBase.java   |    2 +-
 .../hadoop-aws/src/test/resources/core-site.xml |   26 +
 .../src/test/resources/log4j.properties         |   15 +-
 98 files changed, 13007 insertions(+), 540 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-common-project/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 8972e71..6b4ace5 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -163,6 +163,11 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
       <scope>compile</scope>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-common-project/hadoop-common/src/main/bin/hadoop
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop b/hadoop-common-project/hadoop-common/src/main/bin/hadoop
index 1575996..74bf2d9 100755
--- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop
+++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop
@@ -134,6 +134,9 @@ case $COMMAND in
         echo $CLASSPATH
         exit
       fi
+    elif [ "$COMMAND" = "s3guard" ] ; then
+      CLASS=org.apache.hadoop.fs.s3a.s3guard.S3GuardTool
+      CLASSPATH=${CLASSPATH}:${TOOL_PATH}
     elif [[ "$COMMAND" = -*  ]] ; then
         # class and package names cannot begin with a -
         echo "Error: No command named \`$COMMAND' was found. Perhaps you meant \`hadoop ${COMMAND#-}'"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
index 9bea8f9..df14ee8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -132,6 +133,13 @@ public abstract class AbstractFileSystem {
         CONSTRUCTOR_CACHE.put(theClass, meth);
       }
       result = meth.newInstance(uri, conf);
+    } catch (InvocationTargetException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof RuntimeException) {
+        throw (RuntimeException) cause;
+      } else {
+        throw new RuntimeException(cause);
+      }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
index 341d11f..039dbad 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
@@ -342,6 +342,15 @@ public class FileContext {
           return AbstractFileSystem.get(uri, conf);
         }
       });
+    } catch (RuntimeException ex) {
+      // RTEs can wrap other exceptions; if there is an IOException inner,
+      // throw it direct.
+      Throwable cause = ex.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else {
+        throw ex;
+      }
     } catch (InterruptedException ex) {
       LOG.error(ex.toString());
       throw new IOException("Failed to get the AbstractFileSystem for path: "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 041a912..ed5a3a5 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1312,12 +1312,120 @@
 </property>
 
 <property>
+    <name>fs.s3a.metadatastore.authoritative</name>
+    <value>false</value>
+    <description>
+        When true, allow MetadataStore implementations to act as source of
+        truth for getting file status and directory listings.  Even if this
+        is set to true, MetadataStore implementations may choose not to
+        return authoritative results.  If the configured MetadataStore does
+        not support being authoritative, this setting will have no effect.
+    </description>
+</property>
+
+<property>
+    <name>fs.s3a.metadatastore.impl</name>
+    <value>org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore</value>
+    <description>
+        Fully-qualified name of the class that implements the MetadataStore
+        to be used by s3a.  The default class, NullMetadataStore, has no
+        effect: s3a will continue to treat the backing S3 service as the one
+        and only source of truth for file and directory metadata.
+    </description>
+</property>
+
+<property>
+    <name>fs.s3a.s3guard.cli.prune.age</name>
+    <value>86400000</value>
+    <description>
+        Default age (in milliseconds) after which to prune metadata from the
+        metadatastore when the prune command is run.  Can be overridden on the
+        command-line.
+    </description>
+</property>
+
+
+<property>
   <name>fs.s3a.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
   <description>The implementation class of the S3A Filesystem</description>
 </property>
 
 <property>
+  <name>fs.s3a.s3guard.ddb.region</name>
+  <value></value>
+  <description>
+    AWS DynamoDB region to connect to. An up-to-date list is
+    provided in the AWS Documentation: regions and endpoints. Without this
+    property, the S3Guard will operate table in the associated S3 bucket region.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.table</name>
+  <value></value>
+  <description>
+    The DynamoDB table name to operate. Without this property, the respective
+    S3 bucket name will be used.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.table.create</name>
+  <value>false</value>
+  <description>
+    If true, the S3A client will create the table if it does not already exist.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.table.capacity.read</name>
+  <value>500</value>
+  <description>
+    Provisioned throughput requirements for read operations in terms of capacity
+    units for the DynamoDB table.  This config value will only be used when
+    creating a new DynamoDB table, though later you can manually provision by
+    increasing or decreasing read capacity as needed for existing tables.
+    See DynamoDB documents for more information.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.table.capacity.write</name>
+  <value>100</value>
+  <description>
+    Provisioned throughput requirements for write operations in terms of
+    capacity units for the DynamoDB table.  Refer to related config
+    fs.s3a.s3guard.ddb.table.capacity.read before usage.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.max.retries</name>
+  <value>9</value>
+    <description>
+      Max retries on batched DynamoDB operations before giving up and
+      throwing an IOException.  Each retry is delayed with an exponential
+      backoff timer which starts at 100 milliseconds and approximately
+      doubles each time.  The minimum wait before throwing an exception is
+      sum(100, 200, 400, 800, .. 100*2^N-1 ) == 100 * ((2^N)-1)
+      So N = 9 yields at least 51.1 seconds (51,100) milliseconds of blocking
+      before throwing an IOException.
+    </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.background.sleep</name>
+  <value>25</value>
+  <description>
+    Length (in milliseconds) of pause between each batch of deletes when
+    pruning metadata.  Prevents prune operations (which can typically be low
+    priority background operations) from overly interfering with other I/O
+    operations.
+  </description>
+</property>
+
+<property>
   <name>fs.AbstractFileSystem.s3a.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3A</value>
   <description>The implementation class of the S3A AbstractFileSystem.</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java
index b0dcb93..b6d0a49 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java
@@ -222,4 +222,67 @@ public abstract class AbstractContractRenameTest extends
     assertPathDoesNotExist("not deleted",
         new Path(srcDir, "source.txt"));
   }
+
+  /**
+   * Test that after renaming, the nested subdirectory is moved along with all
+   * its ancestors.
+   */
+  @Test
+  public void testRenamePopulatesDirectoryAncestors() throws IOException {
+    final FileSystem fs = getFileSystem();
+    final Path src = path("testRenamePopulatesDirectoryAncestors/source");
+    fs.mkdirs(src);
+    final String nestedDir = "/dir1/dir2/dir3/dir4";
+    fs.mkdirs(path(src + nestedDir));
+
+    Path dst = path("testRenamePopulatesDirectoryAncestorsNew");
+
+    fs.rename(src, dst);
+    validateAncestorsMoved(src, dst, nestedDir);
+  }
+
+  /**
+   * Test that after renaming, the nested file is moved along with all its
+   * ancestors. It is similar to {@link #testRenamePopulatesDirectoryAncestors}.
+   */
+  @Test
+  public void testRenamePopulatesFileAncestors() throws IOException {
+    final FileSystem fs = getFileSystem();
+    final Path src = path("testRenamePopulatesFileAncestors/source");
+    fs.mkdirs(src);
+    final String nestedFile = "/dir1/dir2/dir3/file4";
+    byte[] srcDataset = dataset(256, 'a', 'z');
+    writeDataset(fs, path(src + nestedFile), srcDataset, srcDataset.length,
+        1024, false);
+
+    Path dst = path("testRenamePopulatesFileAncestorsNew");
+
+    fs.rename(src, dst);
+    validateAncestorsMoved(src, dst, nestedFile);
+  }
+
+  /**
+   * Validate that the nested path and its ancestors should have been moved.
+   *
+   * @param src the source root to move
+   * @param dst the destination root to move
+   * @param nestedPath the nested path to move
+   */
+  private void validateAncestorsMoved(Path src, Path dst, String nestedPath)
+      throws IOException {
+    assertIsDirectory(dst);
+    assertPathDoesNotExist("src path should not exist", path(src + nestedPath));
+    assertPathExists("dst path should exist", path(dst + nestedPath));
+
+    Path path = new Path(nestedPath).getParent();
+    while (path != null && !path.isRoot()) {
+      final Path parentSrc = path(src + path.toString());
+      assertPathDoesNotExist(parentSrc + " is not deleted", parentSrc);
+      final Path parentDst = path(dst + path.toString());
+      assertPathExists(parentDst + " should exist after rename", parentDst);
+      assertIsDirectory(parentDst);
+      path = path.getParent();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
index f5be132..1f906be 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
@@ -278,6 +278,23 @@ public final class LambdaTestUtils {
   }
 
   /**
+   * Variant of {@link #eventually(int, Callable, Callable)} method for
+   * void lambda expressions.
+   * @param timeoutMillis timeout in milliseconds.
+   * Can be zero, in which case only one attempt is made before failing.
+   * @param eval expression to evaluate
+   * @param retry retry interval generator
+   * @throws Exception the last exception thrown before timeout was triggered
+   * @throws FailFastException if raised -without any retry attempt.
+   * @throws InterruptedException if interrupted during the sleep operation.
+   */
+  public static void eventually(int timeoutMillis,
+      VoidCallable eval,
+      Callable<Integer> retry) throws Exception {
+    eventually(timeoutMillis, new VoidCaller(eval), retry);
+  }
+
+  /**
    * Simplified {@link #eventually(int, Callable, Callable)} method
    * with a fixed interval.
    * <p>
@@ -306,6 +323,25 @@ public final class LambdaTestUtils {
   }
 
   /**
+   /**
+   * Variant of {@link #eventually(int, int, Callable)} method for
+   * void lambda expressions.
+   * @param timeoutMillis timeout in milliseconds.
+   * Can be zero, in which case only one attempt is made before failing.
+   * @param intervalMillis interval in milliseconds
+   * @param eval expression to evaluate
+   * @throws Exception the last exception thrown before timeout was triggered
+   * @throws FailFastException if raised -without any retry attempt.
+   * @throws InterruptedException if interrupted during the sleep operation.
+   */
+  public static void eventually(int timeoutMillis,
+      int intervalMillis,
+      VoidCallable eval) throws Exception {
+    eventually(timeoutMillis, eval,
+        new FixedRetryInterval(intervalMillis));
+  }
+
+  /**
    * Intercept an exception; throw an {@code AssertionError} if one not raised.
    * The caught exception is rethrown if it is of the wrong class or
    * does not contain the text defined in {@code contained}.
@@ -348,6 +384,32 @@ public final class LambdaTestUtils {
   }
 
   /**
+   * Variant of {@link #intercept(Class, Callable)} to simplify void
+   * invocations.
+   * @param clazz class of exception; the raised exception must be this class
+   * <i>or a subclass</i>.
+   * @param eval expression to eval
+   * @param <E> exception class
+   * @return the caught exception if it was of the expected type
+   * @throws Exception any other exception raised
+   * @throws AssertionError if the evaluation call didn't raise an exception.
+   */
+  public static <E extends Throwable> E intercept(
+      Class<E> clazz,
+      VoidCallable eval)
+      throws Exception {
+    try {
+      eval.call();
+      throw new AssertionError("Expected an exception");
+    } catch (Throwable e) {
+      if (clazz.isAssignableFrom(e.getClass())) {
+        return (E)e;
+      }
+      throw e;
+    }
+  }
+
+  /**
    * Intercept an exception; throw an {@code AssertionError} if one not raised.
    * The caught exception is rethrown if it is of the wrong class or
    * does not contain the text defined in {@code contained}.
@@ -388,6 +450,29 @@ public final class LambdaTestUtils {
   }
 
   /**
+   * Variant of {@link #intercept(Class, Callable)} to simplify void
+   * invocations.
+   * @param clazz class of exception; the raised exception must be this class
+   * <i>or a subclass</i>.
+   * @param contained string which must be in the {@code toString()} value
+   * of the exception
+   * @param eval expression to eval
+   * @param <E> exception class
+   * @return the caught exception if it was of the expected type
+   * @throws Exception any other exception raised
+   * @throws AssertionError if the evaluation call didn't raise an exception.
+   */
+  public static <E extends Throwable> E intercept(
+      Class<E> clazz,
+      String contained,
+      VoidCallable eval)
+      throws Exception {
+    E ex = intercept(clazz, eval);
+    GenericTestUtils.assertExceptionContains(contained, ex);
+    return ex;
+  }
+
+  /**
    * Robust string converter for exception messages; if the {@code toString()}
    * method throws an exception then that exception is caught and logged,
    * then a simple string of the classname logged.
@@ -547,4 +632,31 @@ public final class LambdaTestUtils {
       return new FailFastException(String.format(format, args));
     }
   }
+
+  /**
+   * A simple interface for lambdas, which returns nothing; this exists
+   * to simplify lambda tests on operations with no return value.
+   */
+  public interface VoidCallable {
+    void call() throws Exception;
+  }
+
+  /**
+   * Bridge class to make {@link VoidCallable} something to use in anything
+   * which takes an {@link Callable}.
+   */
+  public static class VoidCaller implements Callable<Void> {
+    private final VoidCallable callback;
+
+    public VoidCaller(VoidCallable callback) {
+      this.callback = callback;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      callback.call();
+      return null;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 62e9fb4..e41c4c1 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -766,6 +766,11 @@
         <version>1.6</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-lang3</artifactId>
+        <version>3.4</version>
+      </dependency>
+      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>${slf4j.version}</version>
@@ -1509,4 +1514,12 @@
       </build>
     </profile>
   </profiles>
+
+  <repositories>
+    <repository>
+      <id>dynamodb-local-oregon</id>
+      <name>DynamoDB Local Release Repository</name>
+      <url>https://s3-us-west-2.amazonaws.com/dynamodb-local/release</url>
+    </repository>
+  </repositories>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
index ffb0a79..82ec16e 100644
--- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
@@ -26,4 +26,10 @@
   <Match>
     <Class name="org.apache.hadoop.fs.s3.INode" />
   </Match>
+  <!-- Redundant null check makes code clearer, future-proof here. -->
+  <Match>
+    <Class name="org.apache.hadoop.fs.s3a.S3AFileSystem" />
+    <Method name="s3Exists" />
+    <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 59b0c82..aa7403b 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -36,6 +36,7 @@
     <downloadSources>true</downloadSources>
     <hadoop.tmp.dir>${project.build.directory}/test</hadoop.tmp.dir>
 
+    <dynamodb.local.version>1.11.86</dynamodb.local.version>
     <!-- are scale tests enabled ? -->
     <fs.s3a.scale.test.enabled>unset</fs.s3a.scale.test.enabled>
     <!-- Size in MB of huge files. -->
@@ -44,6 +45,11 @@
     <fs.s3a.scale.test.huge.partitionsize>unset</fs.s3a.scale.test.huge.partitionsize>
     <!-- Timeout in seconds for scale tests.-->
     <fs.s3a.scale.test.timeout>3600</fs.s3a.scale.test.timeout>
+    <!-- are scale tests enabled ? -->
+    <fs.s3a.s3guard.test.enabled>false</fs.s3a.s3guard.test.enabled>
+    <fs.s3a.s3guard.test.authoritative>false</fs.s3a.s3guard.test.authoritative>
+    <fs.s3a.s3guard.test.implementation>local</fs.s3a.s3guard.test.implementation>
+
   </properties>
 
   <profiles>
@@ -164,6 +170,11 @@
                     <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
                     <fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
                     <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
+                    <!-- S3Guard -->
+                    <fs.s3a.s3guard.test.enabled>${fs.s3a.s3guard.test.enabled}</fs.s3a.s3guard.test.enabled>
+                    <fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative>
+                    <fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation>
+
                   </systemPropertyVariables>
                   <!-- Some tests cannot run in parallel.  Tests that cover -->
                   <!-- access to the root directory must run in isolation -->
@@ -206,6 +217,10 @@
                     <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
                     <fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
                     <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
+                    <!-- S3Guard -->
+                    <fs.s3a.s3guard.test.enabled>${fs.s3a.s3guard.test.enabled}</fs.s3a.s3guard.test.enabled>
+                    <fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation>
+                    <fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative>
                   </systemPropertyVariables>
                   <!-- Do a sequential run for tests that cannot handle -->
                   <!-- parallel execution. -->
@@ -249,6 +264,10 @@
                     <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
                     <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
                     <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
+                    <!-- S3Guard -->
+                    <fs.s3a.s3guard.test.enabled>${fs.s3a.s3guard.test.enabled}</fs.s3a.s3guard.test.enabled>
+                    <fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation>
+                    <fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative>
                   </systemPropertyVariables>
                   <forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
                 </configuration>
@@ -271,6 +290,60 @@
         <fs.s3a.scale.test.enabled>true</fs.s3a.scale.test.enabled>
       </properties>
     </profile>
+
+    <!-- Turn on S3Guard tests-->
+    <profile>
+      <id>s3guard</id>
+      <activation>
+        <property>
+          <name>s3guard</name>
+        </property>
+      </activation>
+      <properties >
+        <fs.s3a.s3guard.test.enabled>true</fs.s3a.s3guard.test.enabled>
+      </properties>
+    </profile>
+
+    <!-- Switch to DynamoDB for S3Guard. Has no effect unless S3Guard is enabled -->
+    <profile>
+      <id>dynamo</id>
+      <activation>
+        <property>
+          <name>dynamo</name>
+        </property>
+      </activation>
+      <properties >
+        <fs.s3a.s3guard.test.implementation>dynamo</fs.s3a.s3guard.test.implementation>
+      </properties>
+    </profile>
+
+    <!-- Switch to DynamoDBLocal for S3Guard. Has no effect unless S3Guard is enabled -->
+    <profile>
+      <id>dynamodblocal</id>
+      <activation>
+        <property>
+          <name>dynamodblocal</name>
+        </property>
+      </activation>
+      <properties>
+        <fs.s3a.s3guard.test.implementation>dynamodblocal</fs.s3a.s3guard.test.implementation>
+      </properties>
+    </profile>
+
+    <!-- Switch S3Guard from Authoritative=false to true
+     Has no effect unless S3Guard is enabled -->
+    <profile>
+      <id>non-auth</id>
+      <activation>
+        <property>
+          <name>auth</name>
+        </property>
+      </activation>
+      <properties >
+        <fs.s3a.s3guard.test.authoritative>true</fs.s3a.s3guard.test.authoritative>
+      </properties>
+    </profile>
+
   </profiles>
 
   <build>
@@ -301,6 +374,33 @@
           <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <includeScope>test</includeScope>
+              <includeTypes>so,dll,dylib</includeTypes>
+              <outputDirectory>${project.build.directory}/native-libs</outputDirectory>
+            </configuration>
+          </execution>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 
@@ -309,12 +409,32 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>servlet-api</artifactId>
+          <groupId>javax.servlet</groupId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-lang3</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>test</scope>
       <type>test-jar</type>
+      <exclusions>
+        <exclusion>
+          <artifactId>servlet-api</artifactId>
+          <groupId>javax.servlet</groupId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-lang3</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>com.amazonaws</groupId>
@@ -322,6 +442,33 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>DynamoDBLocal</artifactId>
+      <version>${dynamodb.local.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.hamcrest</groupId>
+          <artifactId>hamcrest-core</artifactId>
+        </exclusion>
+<!--
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-http</artifactId>
+        </exclusion>
+-->
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-lang3</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 091e691..c3fc034 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -272,6 +272,11 @@ public final class Constants {
 
   public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix";
 
+  /** Whether or not to allow MetadataStore to be source of truth. */
+  public static final String METADATASTORE_AUTHORITATIVE =
+      "fs.s3a.metadatastore.authoritative";
+  public static final boolean DEFAULT_METADATASTORE_AUTHORITATIVE = false;
+
   /** read ahead buffer size to prevent connection re-establishments. */
   public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
   public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
@@ -317,7 +322,7 @@ public final class Constants {
   @InterfaceStability.Unstable
   public static final Class<? extends S3ClientFactory>
       DEFAULT_S3_CLIENT_FACTORY_IMPL =
-          S3ClientFactory.DefaultS3ClientFactory.class;
+          DefaultS3ClientFactory.class;
 
   /**
    * Maximum number of partitions in a multipart upload: {@value}.
@@ -325,4 +330,130 @@ public final class Constants {
   @InterfaceAudience.Private
   public static final int MAX_MULTIPART_COUNT = 10000;
 
+  /**
+   * Classname of the S3A-specific output committer factory. This
+   * is what must be declared when attempting to use
+   */
+  @InterfaceStability.Unstable
+  public static final String S3A_OUTPUT_COMMITTER_FACTORY =
+      "org.apache.hadoop.fs.s3a.commit.S3AOutputCommitterFactory";
+
+  /* Constants. */
+  public static final String S3_METADATA_STORE_IMPL =
+      "fs.s3a.metadatastore.impl";
+
+  /** Minimum period of time (in milliseconds) to keep metadata (may only be
+   * applied when a prune command is manually run).
+   */
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_CLI_PRUNE_AGE =
+      "fs.s3a.s3guard.cli.prune.age";
+
+  /**
+   * The region of the DynamoDB service.
+   *
+   * This config has no default value. If the user does not set this, the
+   * S3Guard will operate table in the associated S3 bucket region.
+   */
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_DDB_REGION_KEY =
+      "fs.s3a.s3guard.ddb.region";
+
+  /**
+   * The DynamoDB table name to use.
+   *
+   * This config has no default value. If the user does not set this, the
+   * S3Guard implementation will use the respective S3 bucket name.
+   */
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_DDB_TABLE_NAME_KEY =
+      "fs.s3a.s3guard.ddb.table";
+
+  /**
+   * Whether to create the DynamoDB table if the table does not exist.
+   */
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_DDB_TABLE_CREATE_KEY =
+      "fs.s3a.s3guard.ddb.table.create";
+
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_DDB_TABLE_CAPACITY_READ_KEY =
+      "fs.s3a.s3guard.ddb.table.capacity.read";
+  public static final long S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT = 500;
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY =
+      "fs.s3a.s3guard.ddb.table.capacity.write";
+  public static final long S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT = 100;
+
+  /**
+   * The maximum put or delete requests per BatchWriteItem request.
+   *
+   * Refer to Amazon API reference for this limit.
+   */
+  public static final int S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT = 25;
+
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_DDB_MAX_RETRIES =
+      "fs.s3a.s3guard.ddb.max.retries";
+  /**
+   * Max retries on batched DynamoDB operations before giving up and
+   * throwing an IOException.  Default is {@value}. See core-default.xml for
+   * more detail.
+   */
+  public static final int S3GUARD_DDB_MAX_RETRIES_DEFAULT = 9;
+
+  /**
+   * Period of time (in milliseconds) to sleep between batches of writes.
+   * Currently only applies to prune operations, as they are naturally a
+   * lower priority than other operations.
+   */
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY =
+      "fs.s3a.s3guard.ddb.background.sleep";
+  public static final int S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT = 25;
+
+  /**
+   * V1 committer.
+   */
+  @InterfaceStability.Unstable
+  public static final String S3A_OUTPUT_COMMITTER_MRV1 =
+      "org.apache.hadoop.fs.s3a.commit.S3OutputCommitterMRv1";
+
+  /**
+   * The default "Null" metadata store: {@value}.
+   */
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_METASTORE_NULL
+      = "org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore";
+
+  /**
+   * Use Local memory for the metadata: {@value}.
+   * This is not coherent across processes and must be used for testing only.
+   */
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_METASTORE_LOCAL
+      = "org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore";
+
+  /**
+   * Use DynamoDB for the metadata: {@value}.
+   */
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_METASTORE_DYNAMO
+      = "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore";
+
+  /**
+   * Inconsistency (visibility delay) injection settings.
+   */
+  @InterfaceStability.Unstable
+  public static final String FAIL_INJECT_INCONSISTENCY_KEY =
+      "fs.s3a.failinject.inconsistency.key.substring";
+
+  @InterfaceStability.Unstable
+  public static final String FAIL_INJECT_INCONSISTENCY_MSEC =
+      "fs.s3a.failinject.inconsistency.msec";
+
+  @InterfaceStability.Unstable
+  public static final String FAIL_INJECT_INCONSISTENCY_PROBABILITY =
+      "fs.s3a.failinject.inconsistency.probability";
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
new file mode 100644
index 0000000..f33b25e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.VersionInfo;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
+import static org.apache.hadoop.fs.s3a.S3AUtils.intOption;
+
+/**
+ * The default factory implementation, which calls the AWS SDK to configure
+ * and create an {@link AmazonS3Client} that communicates with the S3 service.
+ */
+public class DefaultS3ClientFactory extends Configured implements
+    S3ClientFactory {
+
+  protected static final Logger LOG = S3AFileSystem.LOG;
+
+  @Override
+  public AmazonS3 createS3Client(URI name) throws IOException {
+    Configuration conf = getConf();
+    AWSCredentialsProvider credentials =
+        createAWSCredentialProviderSet(name, conf);
+    final ClientConfiguration awsConf = createAwsConf(getConf());
+    AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
+    return createAmazonS3Client(s3, conf, credentials, awsConf);
+  }
+
+  /**
+   * Create a new {@link ClientConfiguration}.
+   * @param conf The Hadoop configuration
+   * @return new AWS client configuration
+   */
+  public static ClientConfiguration createAwsConf(Configuration conf) {
+    final ClientConfiguration awsConf = new ClientConfiguration();
+    initConnectionSettings(conf, awsConf);
+    initProxySupport(conf, awsConf);
+    initUserAgent(conf, awsConf);
+    return awsConf;
+  }
+
+  /**
+   * Wrapper around constructor for {@link AmazonS3} client.  Override this to
+   * provide an extended version of the client
+   * @param credentials credentials to use
+   * @param awsConf  AWS configuration
+   * @return  new AmazonS3 client
+   */
+  protected AmazonS3 newAmazonS3Client(
+      AWSCredentialsProvider credentials, ClientConfiguration awsConf) {
+    return new AmazonS3Client(credentials, awsConf);
+  }
+
+  /**
+   * Initializes all AWS SDK settings related to connection management.
+   *
+   * @param conf Hadoop configuration
+   * @param awsConf AWS SDK configuration
+   */
+  private static void initConnectionSettings(Configuration conf,
+      ClientConfiguration awsConf) {
+    awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+        DEFAULT_MAXIMUM_CONNECTIONS, 1));
+    boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
+        DEFAULT_SECURE_CONNECTIONS);
+    awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
+    awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+        DEFAULT_MAX_ERROR_RETRIES, 0));
+    awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+        DEFAULT_ESTABLISH_TIMEOUT, 0));
+    awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+        DEFAULT_SOCKET_TIMEOUT, 0));
+    int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
+        DEFAULT_SOCKET_SEND_BUFFER, 2048);
+    int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
+        DEFAULT_SOCKET_RECV_BUFFER, 2048);
+    awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
+    String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
+    if (!signerOverride.isEmpty()) {
+      LOG.debug("Signer override = {}", signerOverride);
+      awsConf.setSignerOverride(signerOverride);
+    }
+  }
+
+  /**
+   * Initializes AWS SDK proxy support if configured.
+   *
+   * @param conf Hadoop configuration
+   * @param awsConf AWS SDK configuration
+   * @throws IllegalArgumentException if misconfigured
+   */
+  private static void initProxySupport(Configuration conf,
+      ClientConfiguration awsConf) throws IllegalArgumentException {
+    String proxyHost = conf.getTrimmed(PROXY_HOST, "");
+    int proxyPort = conf.getInt(PROXY_PORT, -1);
+    if (!proxyHost.isEmpty()) {
+      awsConf.setProxyHost(proxyHost);
+      if (proxyPort >= 0) {
+        awsConf.setProxyPort(proxyPort);
+      } else {
+        if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
+          LOG.warn("Proxy host set without port. Using HTTPS default 443");
+          awsConf.setProxyPort(443);
+        } else {
+          LOG.warn("Proxy host set without port. Using HTTP default 80");
+          awsConf.setProxyPort(80);
+        }
+      }
+      String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
+      String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
+      if ((proxyUsername == null) != (proxyPassword == null)) {
+        String msg = "Proxy error: " + PROXY_USERNAME + " or " +
+            PROXY_PASSWORD + " set without the other.";
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg);
+      }
+      awsConf.setProxyUsername(proxyUsername);
+      awsConf.setProxyPassword(proxyPassword);
+      awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
+      awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+                "domain {} as workstation {}", awsConf.getProxyHost(),
+            awsConf.getProxyPort(),
+            String.valueOf(awsConf.getProxyUsername()),
+            awsConf.getProxyPassword(), awsConf.getProxyDomain(),
+            awsConf.getProxyWorkstation());
+      }
+    } else if (proxyPort >= 0) {
+      String msg =
+          "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
+      LOG.error(msg);
+      throw new IllegalArgumentException(msg);
+    }
+  }
+
+  /**
+   * Initializes the User-Agent header to send in HTTP requests to the S3
+   * back-end.  We always include the Hadoop version number.  The user also
+   * may set an optional custom prefix to put in front of the Hadoop version
+   * number.  The AWS SDK interally appends its own information, which seems
+   * to include the AWS SDK version, OS and JVM version.
+   *
+   * @param conf Hadoop configuration
+   * @param awsConf AWS SDK configuration
+   */
+  private static void initUserAgent(Configuration conf,
+      ClientConfiguration awsConf) {
+    String userAgent = "Hadoop " + VersionInfo.getVersion();
+    String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
+    if (!userAgentPrefix.isEmpty()) {
+      userAgent = userAgentPrefix + ", " + userAgent;
+    }
+    LOG.debug("Using User-Agent: {}", userAgent);
+    awsConf.setUserAgentPrefix(userAgent);
+  }
+
+  /**
+   * Creates an {@link AmazonS3Client} from the established configuration.
+   *
+   * @param conf Hadoop configuration
+   * @param credentials AWS credentials
+   * @param awsConf AWS SDK configuration
+   * @return S3 client
+   * @throws IllegalArgumentException if misconfigured
+   */
+  private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf,
+      AWSCredentialsProvider credentials, ClientConfiguration awsConf)
+      throws IllegalArgumentException {
+    String endPoint = conf.getTrimmed(ENDPOINT, "");
+    if (!endPoint.isEmpty()) {
+      try {
+        s3.setEndpoint(endPoint);
+      } catch (IllegalArgumentException e) {
+        String msg = "Incorrect endpoint: "  + e.getMessage();
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg, e);
+      }
+    }
+    enablePathStyleAccessIfRequired(s3, conf);
+    return s3;
+  }
+
+  /**
+   * Enables path-style access to S3 buckets if configured.  By default, the
+   * behavior is to use virtual hosted-style access with URIs of the form
+   * http://bucketname.s3.amazonaws.com.  Enabling path-style access and a
+   * region-specific endpoint switches the behavior to use URIs of the form
+   * http://s3-eu-west-1.amazonaws.com/bucketname.
+   *
+   * @param s3 S3 client
+   * @param conf Hadoop configuration
+   */
+  private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
+      Configuration conf) {
+    final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
+    if (pathStyleAccess) {
+      LOG.debug("Enabling path style access!");
+      s3.setS3ClientOptions(S3ClientOptions.builder()
+          .setPathStyleAccess(true)
+          .build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
new file mode 100644
index 0000000..5e9cb3f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects
+ * inconsistency and/or errors.  Used for testing S3Guard.
+ * Currently only delays listing visibility, not affecting GET.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InconsistentAmazonS3Client extends AmazonS3Client {
+
+  /**
+   * Keys containing this substring will be subject to delayed visibility.
+   */
+  public static final String DEFAULT_DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME";
+
+  /**
+   * How many seconds affected keys will be delayed from appearing in listing.
+   * This should probably be a config value.
+   */
+  public static final long DEFAULT_DELAY_KEY_MSEC = 5 * 1000;
+
+  public static final float DEFAULT_DELAY_KEY_PROBABILITY = 1.0f;
+
+  /** Special config value since we can't store empty strings in XML. */
+  public static final String MATCH_ALL_KEYS = "*";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(InconsistentAmazonS3Client.class);
+
+  /** Empty string matches all keys. */
+  private String delayKeySubstring;
+
+  /** Probability to delay visibility of a matching key. */
+  private float delayKeyProbability;
+
+  /** Time in milliseconds to delay visibility of newly modified object. */
+  private long delayKeyMsec;
+
+  /**
+   * Composite of data we need to track about recently deleted objects:
+   * when it was deleted (same was with recently put objects) and the object
+   * summary (since we should keep returning it for sometime after its
+   * deletion).
+   */
+  private static class Delete {
+    private Long time;
+    private S3ObjectSummary summary;
+
+    Delete(Long time, S3ObjectSummary summary) {
+      this.time = time;
+      this.summary = summary;
+    }
+
+    public Long time() {
+      return time;
+    }
+
+    public S3ObjectSummary summary() {
+      return summary;
+    }
+  }
+
+  /** Map of key to delay -> time it was deleted + object summary (object
+   * summary is null for prefixes. */
+  private Map<String, Delete> delayedDeletes = new HashMap<>();
+
+  /** Map of key to delay -> time it was created. */
+  private Map<String, Long> delayedPutKeys = new HashMap<>();
+
+  public InconsistentAmazonS3Client(AWSCredentialsProvider credentials,
+      ClientConfiguration clientConfiguration, Configuration conf) {
+    super(credentials, clientConfiguration);
+    setupConfig(conf);
+  }
+
+  protected void setupConfig(Configuration conf) {
+
+    delayKeySubstring = conf.get(FAIL_INJECT_INCONSISTENCY_KEY,
+        DEFAULT_DELAY_KEY_SUBSTRING);
+    // "" is a substring of all strings, use it to match all keys.
+    if (delayKeySubstring.equals(MATCH_ALL_KEYS)) {
+      delayKeySubstring = "";
+    }
+    delayKeyProbability = conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY,
+        DEFAULT_DELAY_KEY_PROBABILITY);
+    delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC,
+        DEFAULT_DELAY_KEY_MSEC);
+    LOG.info("Enabled with {} msec delay, substring {}, probability {}",
+        delayKeyMsec, delayKeySubstring, delayKeyProbability);
+  }
+
+  /**
+   * Clear all oustanding inconsistent keys.  After calling this function,
+   * listings should behave normally (no failure injection), until additional
+   * keys are matched for delay, e.g. via putObject(), deleteObject().
+   */
+  public void clearInconsistency() {
+    LOG.info("clearing all delayed puts / deletes");
+    delayedDeletes.clear();
+    delayedPutKeys.clear();
+  }
+
+  /**
+   * Convenience function for test code to cast from supertype.
+   * @param c supertype to cast from
+   * @return subtype, not null
+   * @throws Exception on error
+   */
+  public static InconsistentAmazonS3Client castFrom(AmazonS3 c) throws
+      Exception {
+    InconsistentAmazonS3Client ic = null;
+    if (c instanceof InconsistentAmazonS3Client) {
+      ic = (InconsistentAmazonS3Client) c;
+    }
+    Preconditions.checkNotNull(ic, "Not an instance of " +
+        "InconsistentAmazonS3Client");
+    return ic;
+  }
+
+  @Override
+  public DeleteObjectsResult deleteObjects(DeleteObjectsRequest
+      deleteObjectsRequest)
+      throws AmazonClientException, AmazonServiceException {
+    for (DeleteObjectsRequest.KeyVersion keyVersion :
+        deleteObjectsRequest.getKeys()) {
+      registerDeleteObject(keyVersion.getKey(), deleteObjectsRequest
+          .getBucketName());
+    }
+    return super.deleteObjects(deleteObjectsRequest);
+  }
+
+  @Override
+  public void deleteObject(DeleteObjectRequest deleteObjectRequest)
+      throws AmazonClientException, AmazonServiceException {
+    String key = deleteObjectRequest.getKey();
+    LOG.debug("key {}", key);
+    registerDeleteObject(key, deleteObjectRequest.getBucketName());
+    super.deleteObject(deleteObjectRequest);
+  }
+
+  /* We should only need to override this version of putObject() */
+  @Override
+  public PutObjectResult putObject(PutObjectRequest putObjectRequest)
+      throws AmazonClientException, AmazonServiceException {
+    LOG.debug("key {}", putObjectRequest.getKey());
+    registerPutObject(putObjectRequest);
+    return super.putObject(putObjectRequest);
+  }
+
+  /* We should only need to override this version of listObjects() */
+  @Override
+  public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
+      throws AmazonClientException, AmazonServiceException {
+    LOG.debug("prefix {}", listObjectsRequest.getPrefix());
+    ObjectListing listing = super.listObjects(listObjectsRequest);
+    listing = filterListObjects(listObjectsRequest, listing);
+    listing = restoreListObjects(listObjectsRequest, listing);
+    return listing;
+  }
+
+  private void addSummaryIfNotPresent(List<S3ObjectSummary> list,
+      S3ObjectSummary item) {
+    // Behavior of S3ObjectSummary
+    String key = item.getKey();
+    for (S3ObjectSummary member : list) {
+      if (member.getKey().equals(key)) {
+        return;
+      }
+    }
+    list.add(item);
+  }
+
+  /**
+   * Add prefix of child to given list.  The added prefix will be equal to
+   * ancestor plus one directory past ancestor.  e.g.:
+   * if ancestor is "/a/b/c" and child is "/a/b/c/d/e/file" then "a/b/c/d" is
+   * added to list.
+   * @param prefixes list to add to
+   * @param ancestor path we are listing in
+   * @param child full path to get prefix from
+   */
+  private void addPrefixIfNotPresent(List<String> prefixes, String ancestor,
+      String child) {
+    Path prefixCandidate = new Path(child).getParent();
+    Path ancestorPath = new Path(ancestor);
+    Preconditions.checkArgument(child.startsWith(ancestor), "%s does not " +
+        "start with %s", child, ancestor);
+    while (!prefixCandidate.isRoot()) {
+      Path nextParent = prefixCandidate.getParent();
+      if (nextParent.equals(ancestorPath)) {
+        String prefix = prefixCandidate.toString();
+        if (!prefixes.contains(prefix)) {
+          prefixes.add(prefix);
+        }
+        return;
+      }
+      prefixCandidate = nextParent;
+    }
+  }
+
+  /**
+   * Checks that the parent key is an ancestor of the child key.
+   * @param parent key that may be the parent.
+   * @param child key that may be the child.
+   * @param recursive if false, only return true for direct children.  If
+   *                  true, any descendant will count.
+   * @return true if parent is an ancestor of child
+   */
+  private boolean isDescendant(String parent, String child, boolean recursive) {
+    if (recursive) {
+      if (!parent.endsWith("/")) {
+        parent = parent + "/";
+      }
+      return child.startsWith(parent);
+    } else {
+      Path actualParentPath = new Path(child).getParent();
+      Path expectedParentPath = new Path(parent);
+      return actualParentPath.equals(expectedParentPath);
+    }
+  }
+
+  /**
+   * Simulate eventual consistency of delete for this list operation:  Any
+   * recently-deleted keys will be added.
+   * @param request List request
+   * @param rawListing listing returned from underlying S3
+   * @return listing with recently-deleted items restored
+   */
+  private ObjectListing restoreListObjects(ListObjectsRequest request,
+      ObjectListing rawListing) {
+    List<S3ObjectSummary> outputList = rawListing.getObjectSummaries();
+    List<String> outputPrefixes = rawListing.getCommonPrefixes();
+    // recursive list has no delimiter, returns everything that matches a
+    // prefix.
+    boolean recursiveObjectList = !("/".equals(request.getDelimiter()));
+
+    // Go through all deleted keys
+    for (String key : new HashSet<>(delayedDeletes.keySet())) {
+      Delete delete = delayedDeletes.get(key);
+      if (isKeyDelayed(delete.time(), key)) {
+        if (isDescendant(request.getPrefix(), key, recursiveObjectList)) {
+          if (delete.summary() != null) {
+            addSummaryIfNotPresent(outputList, delete.summary());
+          }
+        }
+        // Non-recursive list has delimiter: will return rolled-up prefixes for
+        // all keys that are not direct children
+        if (!recursiveObjectList) {
+          if (isDescendant(request.getPrefix(), key, true)) {
+            addPrefixIfNotPresent(outputPrefixes, request.getPrefix(), key);
+          }
+        }
+      } else {
+        // Clean up any expired entries
+        delayedDeletes.remove(key);
+      }
+    }
+
+    return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+  }
+
+  private ObjectListing filterListObjects(ListObjectsRequest request,
+      ObjectListing rawListing) {
+
+    // Filter object listing
+    List<S3ObjectSummary> outputList = new ArrayList<>();
+    for (S3ObjectSummary s : rawListing.getObjectSummaries()) {
+      String key = s.getKey();
+      if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
+        outputList.add(s);
+      }
+    }
+
+    // Filter prefixes (directories)
+    List<String> outputPrefixes = new ArrayList<>();
+    for (String key : rawListing.getCommonPrefixes()) {
+      if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
+        outputPrefixes.add(key);
+      }
+    }
+
+    return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+  }
+
+  private boolean isKeyDelayed(Long enqueueTime, String key) {
+    if (enqueueTime == null) {
+      LOG.debug("no delay for key {}", key);
+      return false;
+    }
+    long currentTime = System.currentTimeMillis();
+    long deadline = enqueueTime + delayKeyMsec;
+    if (currentTime >= deadline) {
+      delayedDeletes.remove(key);
+      LOG.debug("no longer delaying {}", key);
+      return false;
+    } else  {
+      LOG.info("delaying {}", key);
+      return true;
+    }
+  }
+
+  private void registerDeleteObject(String key, String bucket) {
+    if (shouldDelay(key)) {
+      // Record summary so we can add it back for some time post-deletion
+      S3ObjectSummary summary = null;
+      ObjectListing list = listObjects(bucket, key);
+      for (S3ObjectSummary result : list.getObjectSummaries()) {
+        if (result.getKey().equals(key)) {
+          summary = result;
+          break;
+        }
+      }
+      delayedDeletes.put(key, new Delete(System.currentTimeMillis(), summary));
+    }
+  }
+
+  private void registerPutObject(PutObjectRequest req) {
+    String key = req.getKey();
+    if (shouldDelay(key)) {
+      enqueueDelayedPut(key);
+    }
+  }
+
+  /**
+   * Should we delay listing visibility for this key?
+   * @param key key which is being put
+   * @return true if we should delay
+   */
+  private boolean shouldDelay(String key) {
+    boolean delay = key.contains(delayKeySubstring);
+    delay = delay && trueWithProbability(delayKeyProbability);
+    LOG.debug("{} -> {}", key, delay);
+    return delay;
+  }
+
+
+  private boolean trueWithProbability(float p) {
+    return Math.random() < p;
+  }
+
+  /**
+   * Record this key as something that should not become visible in
+   * listObject replies for a while, to simulate eventual list consistency.
+   * @param key key to delay visibility of
+   */
+  private void enqueueDelayedPut(String key) {
+    LOG.debug("delaying put of {}", key);
+    delayedPutKeys.put(key, System.currentTimeMillis());
+  }
+
+  /** Since ObjectListing is immutable, we just override it with wrapper. */
+  private static class CustomObjectListing extends ObjectListing {
+
+    private final List<S3ObjectSummary> customListing;
+    private final List<String> customPrefixes;
+
+    CustomObjectListing(ObjectListing rawListing,
+        List<S3ObjectSummary> customListing,
+        List<String> customPrefixes) {
+      super();
+      this.customListing = customListing;
+      this.customPrefixes = customPrefixes;
+
+      this.setBucketName(rawListing.getBucketName());
+      this.setCommonPrefixes(rawListing.getCommonPrefixes());
+      this.setDelimiter(rawListing.getDelimiter());
+      this.setEncodingType(rawListing.getEncodingType());
+      this.setMarker(rawListing.getMarker());
+      this.setMaxKeys(rawListing.getMaxKeys());
+      this.setNextMarker(rawListing.getNextMarker());
+      this.setPrefix(rawListing.getPrefix());
+      this.setTruncated(rawListing.isTruncated());
+    }
+
+    @Override
+    public List<S3ObjectSummary> getObjectSummaries() {
+      return customListing;
+    }
+
+    @Override
+    public List<String> getCommonPrefixes() {
+      return customPrefixes;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
new file mode 100644
index 0000000..17d268b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * S3 Client factory used for testing with eventual consistency fault injection.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InconsistentS3ClientFactory extends DefaultS3ClientFactory {
+
+  @Override
+  protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials,
+      ClientConfiguration awsConf) {
+    LOG.warn("** FAILURE INJECTION ENABLED.  Do not run in production! **");
+    return new InconsistentAmazonS3Client(credentials, awsConf, getConf());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 30d8e6f..8efa218 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -22,18 +22,25 @@ import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
 import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
@@ -54,6 +61,43 @@ public class Listing {
   }
 
   /**
+   * Create a FileStatus iterator against a provided list of file status, with
+   * a given status filter.
+   *
+   * @param fileStatuses the provided list of file status. NO remote calls.
+   * @param filter file path filter on which paths to accept
+   * @param acceptor the file status acceptor
+   * @return the file status iterator
+   */
+  ProvidedFileStatusIterator createProvidedFileStatusIterator(
+      FileStatus[] fileStatuses,
+      PathFilter filter,
+      FileStatusAcceptor acceptor) {
+    return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor);
+  }
+
+  /**
+   * Create a FileStatus iterator against a path, with a given list object
+   * request.
+   *
+   * @param listPath path of the listing
+   * @param request initial request to make
+   * @param filter the filter on which paths to accept
+   * @param acceptor the class/predicate to decide which entries to accept
+   * in the listing based on the full file status.
+   * @return the iterator
+   * @throws IOException IO Problems
+   */
+  FileStatusListingIterator createFileStatusListingIterator(
+      Path listPath,
+      ListObjectsRequest request,
+      PathFilter filter,
+      Listing.FileStatusAcceptor acceptor) throws IOException {
+    return createFileStatusListingIterator(listPath, request, filter, acceptor,
+        null);
+  }
+
+  /**
    * Create a FileStatus iterator against a path, with a given
    * list object request.
    * @param listPath path of the listing
@@ -61,6 +105,8 @@ public class Listing {
    * @param filter the filter on which paths to accept
    * @param acceptor the class/predicate to decide which entries to accept
    * in the listing based on the full file status.
+   * @param providedStatus the provided list of file status, which may contain
+   *                       items that are not listed from source.
    * @return the iterator
    * @throws IOException IO Problems
    */
@@ -68,11 +114,13 @@ public class Listing {
       Path listPath,
       ListObjectsRequest request,
       PathFilter filter,
-      Listing.FileStatusAcceptor acceptor) throws IOException {
+      Listing.FileStatusAcceptor acceptor,
+      RemoteIterator<FileStatus> providedStatus) throws IOException {
     return new FileStatusListingIterator(
         new ObjectListingIterator(listPath, request),
         filter,
-        acceptor);
+        acceptor,
+        providedStatus);
   }
 
   /**
@@ -80,12 +128,27 @@ public class Listing {
    * @param statusIterator an iterator over the remote status entries
    * @return a new remote iterator
    */
+  @VisibleForTesting
   LocatedFileStatusIterator createLocatedFileStatusIterator(
       RemoteIterator<FileStatus> statusIterator) {
     return new LocatedFileStatusIterator(statusIterator);
   }
 
   /**
+   * Create an located status iterator that wraps another to filter out a set
+   * of recently deleted items.
+   * @param iterator an iterator over the remote located status entries.
+   * @param tombstones set of paths that are recently deleted and should be
+   *                   filtered.
+   * @return a new remote iterator.
+   */
+  @VisibleForTesting
+  TombstoneReconcilingIterator createTombstoneReconcilingIterator(
+      RemoteIterator<LocatedFileStatus> iterator, Set<Path> tombstones) {
+    return new TombstoneReconcilingIterator(iterator, tombstones);
+  }
+
+  /**
    * Interface to implement by the logic deciding whether to accept a summary
    * entry or path as a valid file or directory.
    */
@@ -108,6 +171,13 @@ public class Listing {
      * should be generated.)
      */
     boolean accept(Path keyPath, String commonPrefix);
+
+    /**
+     * Predicate to decide whether or not to accept a file status.
+     * @param status file status containing file path information
+     * @return true if the status is accepted else false
+     */
+    boolean accept(FileStatus status);
   }
 
   /**
@@ -115,9 +185,9 @@ public class Listing {
    * value.
    *
    * If the status value is null, the iterator declares that it has no data.
-   * This iterator is used to handle {@link listStatus()} calls where the path
-   * handed in refers to a file, not a directory: this is the iterator
-   * returned.
+   * This iterator is used to handle {@link S3AFileSystem#listStatus} calls
+   * where the path handed in refers to a file, not a directory: this is the
+   * iterator returned.
    */
   static final class SingleStatusRemoteIterator
       implements RemoteIterator<LocatedFileStatus> {
@@ -169,6 +239,47 @@ public class Listing {
   }
 
   /**
+   * This wraps up a provided non-null list of file status as a remote iterator.
+   *
+   * It firstly filters the provided list and later {@link #next} call will get
+   * from the filtered list. This suffers from scalability issues if the
+   * provided list is too large.
+   *
+   * There is no remote data to fetch.
+   */
+  static class ProvidedFileStatusIterator
+      implements RemoteIterator<FileStatus> {
+    private final ArrayList<FileStatus> filteredStatusList;
+    private int index = 0;
+
+    ProvidedFileStatusIterator(FileStatus[] fileStatuses, PathFilter filter,
+        FileStatusAcceptor acceptor) {
+      Preconditions.checkArgument(fileStatuses != null, "Null status list!");
+
+      filteredStatusList = new ArrayList<>(fileStatuses.length);
+      for (FileStatus status : fileStatuses) {
+        if (filter.accept(status.getPath()) && acceptor.accept(status)) {
+          filteredStatusList.add(status);
+        }
+      }
+      filteredStatusList.trimToSize();
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      return index < filteredStatusList.size();
+    }
+
+    @Override
+    public FileStatus next() throws IOException {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      return filteredStatusList.get(index++);
+    }
+  }
+
+  /**
    * Wraps up object listing into a remote iterator which will ask for more
    * listing data if needed.
    *
@@ -179,7 +290,7 @@ public class Listing {
    * iterator can declare that there is more data available.
    *
    * The need to filter the results precludes the iterator from simply
-   * declaring that if the {@link S3AFileSystem.ObjectListingIterator#hasNext()}
+   * declaring that if the {@link ObjectListingIterator#hasNext()}
    * is true then there are more results. Instead the next batch of results must
    * be retrieved and filtered.
    *
@@ -208,20 +319,33 @@ public class Listing {
     /** Iterator over the current set of results. */
     private ListIterator<FileStatus> statusBatchIterator;
 
+    private final Set<FileStatus> providedStatus;
+    private Iterator<FileStatus> providedStatusIterator;
+
     /**
      * Create an iterator over file status entries.
      * @param source the listing iterator from a listObjects call.
      * @param filter the filter on which paths to accept
      * @param acceptor the class/predicate to decide which entries to accept
      * in the listing based on the full file status.
+     * @param providedStatus the provided list of file status, which may contain
+     *                       items that are not listed from source.
      * @throws IOException IO Problems
      */
     FileStatusListingIterator(ObjectListingIterator source,
         PathFilter filter,
-        FileStatusAcceptor acceptor) throws IOException {
+        FileStatusAcceptor acceptor,
+        RemoteIterator<FileStatus> providedStatus) throws IOException {
       this.source = source;
       this.filter = filter;
       this.acceptor = acceptor;
+      this.providedStatus = new HashSet<>();
+      for (; providedStatus != null && providedStatus.hasNext();) {
+        final FileStatus status = providedStatus.next();
+        if (filter.accept(status.getPath()) && acceptor.accept(status)) {
+          this.providedStatus.add(status);
+        }
+      }
       // build the first set of results. This will not trigger any
       // remote IO, assuming the source iterator is in its initial
       // iteration
@@ -233,26 +357,53 @@ public class Listing {
      * If there is data in the local filtered list, return true.
      * Else: request more data util that condition is met, or there
      * is no more remote listing data.
+     * Lastly, return true if the {@code providedStatusIterator}
+     * has left items.
      * @return true if a call to {@link #next()} will succeed.
      * @throws IOException
      */
     @Override
     public boolean hasNext() throws IOException {
-      return statusBatchIterator.hasNext() || requestNextBatch();
+      return sourceHasNext() || providedStatusIterator.hasNext();
+    }
+
+    private boolean sourceHasNext() throws IOException {
+      if (statusBatchIterator.hasNext() || requestNextBatch()) {
+        return true;
+      } else {
+        // turn to file status that are only in provided list
+        if (providedStatusIterator == null) {
+          LOG.debug("Start iterating the provided status.");
+          providedStatusIterator = providedStatus.iterator();
+        }
+        return false;
+      }
     }
 
     @Override
     public FileStatus next() throws IOException {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
+      final FileStatus status;
+      if (sourceHasNext()) {
+        status = statusBatchIterator.next();
+        // We remove from provided list the file status listed by S3 so that
+        // this does not return duplicate items.
+        LOG.debug("Removing the status from provided file status {}", status);
+        providedStatus.remove(status);
+      } else {
+        if (providedStatusIterator.hasNext()) {
+          status = providedStatusIterator.next();
+          LOG.debug("Returning provided file status {}", status);
+        } else {
+          throw new NoSuchElementException();
+        }
       }
-      return statusBatchIterator.next();
+      return status;
     }
 
     /**
      * Try to retrieve another batch.
      * Note that for the initial batch,
-     * {@link S3AFileSystem.ObjectListingIterator} does not generate a request;
+     * {@link ObjectListingIterator} does not generate a request;
      * it simply returns the initial set.
      *
      * @return true if a new batch was created.
@@ -312,7 +463,7 @@ public class Listing {
       for (String prefix : objects.getCommonPrefixes()) {
         Path keyPath = owner.keyToQualifiedPath(prefix);
         if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
-          FileStatus status = new S3AFileStatus(false, keyPath,
+          FileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
               owner.getUsername());
           LOG.debug("Adding directory: {}", status);
           added++;
@@ -352,7 +503,7 @@ public class Listing {
    * instance.
    *
    * 2. Second and later invocations will continue the ongoing listing,
-   * calling {@link #continueListObjects(ObjectListing)} to request the next
+   * calling {@link S3AFileSystem#continueListObjects} to request the next
    * batch of results.
    *
    * 3. The {@link #hasNext()} predicate returns true for the initial call,
@@ -504,6 +655,11 @@ public class Listing {
     public boolean accept(Path keyPath, String prefix) {
       return false;
     }
+
+    @Override
+    public boolean accept(FileStatus status) {
+      return (status != null) && status.isFile();
+    }
   }
 
   /**
@@ -534,6 +690,80 @@ public class Listing {
   }
 
   /**
+   * Wraps another iterator and filters out files that appear in the provided
+   * set of tombstones.  Will read ahead in the iterator when necessary to
+   * ensure that emptiness is detected early enough if only deleted objects
+   * remain in the source iterator.
+   */
+  static class TombstoneReconcilingIterator implements
+      RemoteIterator<LocatedFileStatus> {
+    private LocatedFileStatus next = null;
+    private final RemoteIterator<LocatedFileStatus> iterator;
+    private final Set<Path> tombstones;
+
+    /**
+     * @param iterator Source iterator to filter
+     * @param tombstones set of tombstone markers to filter out of results
+     */
+    TombstoneReconcilingIterator(RemoteIterator<LocatedFileStatus>
+        iterator, Set<Path> tombstones) {
+      this.iterator = iterator;
+      if (tombstones != null) {
+        this.tombstones = tombstones;
+      } else {
+        this.tombstones = Collections.EMPTY_SET;
+      }
+    }
+
+    private boolean fetch() throws IOException {
+      while (next == null && iterator.hasNext()) {
+        LocatedFileStatus candidate = iterator.next();
+        if (!tombstones.contains(candidate.getPath())) {
+          next = candidate;
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public boolean hasNext() throws IOException {
+      if (next != null) {
+        return true;
+      }
+      return fetch();
+    }
+
+    public LocatedFileStatus next() throws IOException {
+      if (hasNext()) {
+        LocatedFileStatus result = next;
+        next = null;
+        fetch();
+        return result;
+      }
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * Accept all entries except those which map to S3N pseudo directory markers.
+   */
+  static class AcceptAllButS3nDirs implements FileStatusAcceptor {
+
+    public boolean accept(Path keyPath, S3ObjectSummary summary) {
+      return !summary.getKey().endsWith(S3N_FOLDER_SUFFIX);
+    }
+
+    public boolean accept(Path keyPath, String prefix) {
+      return !keyPath.toString().endsWith(S3N_FOLDER_SUFFIX);
+    }
+
+    public boolean accept(FileStatus status) {
+      return !status.getPath().toString().endsWith(S3N_FOLDER_SUFFIX);
+    }
+
+  }
+
+  /**
    * Accept all entries except the base path and those which map to S3N
    * pseudo directory markers.
    */
@@ -575,6 +805,11 @@ public class Listing {
     public boolean accept(Path keyPath, String prefix) {
       return !keyPath.equals(qualifiedPath);
     }
+
+    @Override
+    public boolean accept(FileStatus status) {
+      return (status != null) && !status.getPath().equals(qualifiedPath);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 3fbdcb0..f846689 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -79,6 +79,9 @@ class S3ABlockOutputStream extends OutputStream {
   /** Size of all blocks. */
   private final int blockSize;
 
+  /** Total bytes for uploads submitted so far. */
+  private long bytesSubmitted;
+
   /** Callback for progress. */
   private final ProgressListener progressListener;
   private final ListeningExecutorService executorService;
@@ -302,6 +305,7 @@ class S3ABlockOutputStream extends OutputStream {
     }
     try {
       multiPartUpload.uploadBlockAsync(getActiveBlock());
+      bytesSubmitted += getActiveBlock().dataSize();
     } finally {
       // set the block to null, so the next write will create a new block.
       clearActiveBlock();
@@ -330,13 +334,14 @@ class S3ABlockOutputStream extends OutputStream {
         this,
         blockCount,
         hasBlock ? block : "(none)");
+    long bytes = 0;
     try {
       if (multiPartUpload == null) {
         if (hasBlock) {
           // no uploads of data have taken place, put the single block up.
           // This must happen even if there is no data, so that 0 byte files
           // are created.
-          putObject();
+          bytes = putObject();
         }
       } else {
         // there has already been at least one block scheduled for upload;
@@ -350,6 +355,7 @@ class S3ABlockOutputStream extends OutputStream {
             multiPartUpload.waitForAllPartUploads();
         // then complete the operation
         multiPartUpload.complete(partETags);
+        bytes = bytesSubmitted;
       }
       LOG.debug("Upload complete for {}", writeOperationHelper);
     } catch (IOException ioe) {
@@ -362,7 +368,7 @@ class S3ABlockOutputStream extends OutputStream {
       clearActiveBlock();
     }
     // All end of write operations, including deleting fake parent directories
-    writeOperationHelper.writeSuccessful();
+    writeOperationHelper.writeSuccessful(bytes);
   }
 
   /**
@@ -370,8 +376,11 @@ class S3ABlockOutputStream extends OutputStream {
    * is empty a 0-byte PUT will be invoked, as it is needed to create an
    * entry at the far end.
    * @throws IOException any problem.
+   * @return number of bytes uploaded. If thread was interrupted while
+   * waiting for upload to complete, returns zero with interrupted flag set
+   * on this thread.
    */
-  private void putObject() throws IOException {
+  private int putObject() throws IOException {
     LOG.debug("Executing regular upload for {}", writeOperationHelper);
 
     final S3ADataBlocks.DataBlock block = getActiveBlock();
@@ -405,9 +414,11 @@ class S3ABlockOutputStream extends OutputStream {
     //wait for completion
     try {
       putObjectResult.get();
+      return size;
     } catch (InterruptedException ie) {
       LOG.warn("Interrupted object upload", ie);
       Thread.currentThread().interrupt();
+      return 0;
     } catch (ExecutionException ee) {
       throw extractException("regular upload", key, ee);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org