You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/01 13:16:29 UTC

[7/8] hadoop git commit: HADOOP-13345 HS3Guard: Improved Consistency for S3A. Contributed by: Chris Nauroth, Aaron Fabbri, Mingliang Liu, Lei (Eddy) Xu, Sean Mackrory, Steve Loughran and others.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 3fbdcb0..f846689 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -79,6 +79,9 @@ class S3ABlockOutputStream extends OutputStream {
   /** Size of all blocks. */
   private final int blockSize;
 
+  /** Total bytes for uploads submitted so far. */
+  private long bytesSubmitted;
+
   /** Callback for progress. */
   private final ProgressListener progressListener;
   private final ListeningExecutorService executorService;
@@ -302,6 +305,7 @@ class S3ABlockOutputStream extends OutputStream {
     }
     try {
       multiPartUpload.uploadBlockAsync(getActiveBlock());
+      bytesSubmitted += getActiveBlock().dataSize();
     } finally {
       // set the block to null, so the next write will create a new block.
       clearActiveBlock();
@@ -330,13 +334,14 @@ class S3ABlockOutputStream extends OutputStream {
         this,
         blockCount,
         hasBlock ? block : "(none)");
+    long bytes = 0;
     try {
       if (multiPartUpload == null) {
         if (hasBlock) {
           // no uploads of data have taken place, put the single block up.
           // This must happen even if there is no data, so that 0 byte files
           // are created.
-          putObject();
+          bytes = putObject();
         }
       } else {
         // there has already been at least one block scheduled for upload;
@@ -350,6 +355,7 @@ class S3ABlockOutputStream extends OutputStream {
             multiPartUpload.waitForAllPartUploads();
         // then complete the operation
         multiPartUpload.complete(partETags);
+        bytes = bytesSubmitted;
       }
       LOG.debug("Upload complete for {}", writeOperationHelper);
     } catch (IOException ioe) {
@@ -362,7 +368,7 @@ class S3ABlockOutputStream extends OutputStream {
       clearActiveBlock();
     }
     // All end of write operations, including deleting fake parent directories
-    writeOperationHelper.writeSuccessful();
+    writeOperationHelper.writeSuccessful(bytes);
   }
 
   /**
@@ -370,8 +376,11 @@ class S3ABlockOutputStream extends OutputStream {
    * is empty a 0-byte PUT will be invoked, as it is needed to create an
    * entry at the far end.
    * @throws IOException any problem.
+   * @return number of bytes uploaded. If thread was interrupted while
+   * waiting for upload to complete, returns zero with interrupted flag set
+   * on this thread.
    */
-  private void putObject() throws IOException {
+  private int putObject() throws IOException {
     LOG.debug("Executing regular upload for {}", writeOperationHelper);
 
     final S3ADataBlocks.DataBlock block = getActiveBlock();
@@ -405,9 +414,11 @@ class S3ABlockOutputStream extends OutputStream {
     //wait for completion
     try {
       putObjectResult.get();
+      return size;
     } catch (InterruptedException ie) {
       LOG.warn("Interrupted object upload", ie);
       Thread.currentThread().interrupt();
+      return 0;
     } catch (ExecutionException ee) {
       throw extractException("regular upload", key, ee);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
index b0f08e3..be08afe 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class S3AFileStatus extends FileStatus {
-  private boolean isEmptyDirectory;
+  private Tristate isEmptyDirectory;
 
   /**
    * Create a directory status.
@@ -42,6 +42,18 @@ public class S3AFileStatus extends FileStatus {
   public S3AFileStatus(boolean isemptydir,
       Path path,
       String owner) {
+    this(Tristate.fromBool(isemptydir), path, owner);
+  }
+
+  /**
+   * Create a directory status.
+   * @param isemptydir is this an empty directory?
+   * @param path the path
+   * @param owner the owner
+   */
+  public S3AFileStatus(Tristate isemptydir,
+      Path path,
+      String owner) {
     super(0, true, 1, 0, 0, path);
     isEmptyDirectory = isemptydir;
     setOwner(owner);
@@ -59,12 +71,37 @@ public class S3AFileStatus extends FileStatus {
   public S3AFileStatus(long length, long modification_time, Path path,
       long blockSize, String owner) {
     super(length, false, 1, blockSize, modification_time, path);
-    isEmptyDirectory = false;
+    isEmptyDirectory = Tristate.FALSE;
     setOwner(owner);
     setGroup(owner);
   }
 
-  public boolean isEmptyDirectory() {
+  /**
+   * Convenience constructor for creating from a vanilla FileStatus plus
+   * an isEmptyDirectory flag.
+   * @param source FileStatus to convert to S3AFileStatus
+   * @param isEmptyDirectory TRUE/FALSE if known to be / not be an empty
+   *     directory, UNKNOWN if that information was not computed.
+   * @return a new S3AFileStatus
+   */
+  public static S3AFileStatus fromFileStatus(FileStatus source,
+      Tristate isEmptyDirectory) {
+    if (source.isDirectory()) {
+      return new S3AFileStatus(isEmptyDirectory, source.getPath(),
+          source.getOwner());
+    } else {
+      return new S3AFileStatus(source.getLen(), source.getModificationTime(),
+          source.getPath(), source.getBlockSize(), source.getOwner());
+    }
+  }
+
+
+  /**
+   * @return FALSE if status is not a directory, or its a dir, but known to
+   * not be empty.  TRUE if it is an empty directory.  UNKNOWN if it is a
+   * directory, but we have not computed whether or not it is empty.
+   */
+  public Tristate isEmptyDirectory() {
     return isEmptyDirectory;
   }
 
@@ -110,7 +147,7 @@ public class S3AFileStatus extends FileStatus {
   @Override
   public String toString() {
     return super.toString() +
-        String.format(" isEmptyDirectory=%s", isEmptyDirectory());
+        String.format(" isEmptyDirectory=%s", isEmptyDirectory().name());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 872dd5f..c22383a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -25,12 +25,16 @@ import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
+import java.util.Set;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -92,6 +96,11 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
@@ -149,6 +158,8 @@ public class S3AFileSystem extends FileSystem {
   private long readAhead;
   private S3AInputPolicy inputPolicy;
   private final AtomicBoolean closed = new AtomicBoolean(false);
+  private MetadataStore metadataStore;
+  private boolean allowAuthoritative;
 
   // The maximum number of entries that can be deleted in any call to s3
   private static final int MAX_ENTRIES_TO_DELETE = 1000;
@@ -277,6 +288,10 @@ public class S3AFileSystem extends FileSystem {
       } else {
         LOG.debug("Using S3AOutputStream");
       }
+
+      metadataStore = S3Guard.getMetadataStore(this);
+      allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
+          DEFAULT_METADATASTORE_AUTHORITATIVE);
     } catch (AmazonClientException e) {
       throw translateException("initializing ", new Path(name), e);
     }
@@ -388,12 +403,35 @@ public class S3AFileSystem extends FileSystem {
    * Returns the S3 client used by this filesystem.
    * @return AmazonS3Client
    */
-  @VisibleForTesting
   AmazonS3 getAmazonS3Client() {
     return s3;
   }
 
   /**
+   * Get the region of a bucket.
+   * @return the region in which a bucket is located
+   * @throws IOException on any failure.
+   */
+  public String getBucketLocation() throws IOException {
+    return getBucketLocation(bucket);
+  }
+
+  /**
+   * Get the region of a bucket.
+   * @param bucketName the name of the bucket
+   * @return the region in which a bucket is located
+   * @throws IOException on any failure.
+   */
+  public String getBucketLocation(String bucketName) throws IOException {
+    try {
+      return s3.getBucketLocation(bucketName);
+    } catch (AmazonClientException e) {
+      throw translateException("getBucketLocation()",
+          bucketName, e);
+    }
+  }
+
+  /**
    * Returns the read ahead range value used by this filesystem
    * @return
    */
@@ -457,7 +495,7 @@ public class S3AFileSystem extends FileSystem {
    * @return a key excluding the leading "/", or, if it is the root path, ""
    */
   @VisibleForTesting
-  String pathToKey(Path path) {
+  public String pathToKey(Path path) {
     if (!path.isAbsolute()) {
       path = new Path(workingDir, path);
     }
@@ -508,7 +546,7 @@ public class S3AFileSystem extends FileSystem {
    * @param path path to qualify
    * @return a qualified path.
    */
-  Path qualify(Path path) {
+  public Path qualify(Path path) {
     return path.makeQualified(uri, workingDir);
   }
 
@@ -578,7 +616,7 @@ public class S3AFileSystem extends FileSystem {
       boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
     String key = pathToKey(f);
-    S3AFileStatus status = null;
+    FileStatus status = null;
     try {
       // get the status or throw an FNFE
       status = getFileStatus(f);
@@ -706,8 +744,8 @@ public class S3AFileSystem extends FileSystem {
    * the description of the operation.
    * This operation throws an exception on any failure which needs to be
    * reported and downgraded to a failure. That is: if a rename
-   * @param src path to be renamed
-   * @param dst new path after rename
+   * @param source path to be renamed
+   * @param dest new path after rename
    * @throws RenameFailedException if some criteria for a state changing
    * rename was not met. This means work didn't happen; it's not something
    * which is reported upstream to the FileSystem APIs, for which the semantics
@@ -716,9 +754,12 @@ public class S3AFileSystem extends FileSystem {
    * @throws IOException on IO failure.
    * @throws AmazonClientException on failures inside the AWS SDK
    */
-  private boolean innerRename(Path src, Path dst)
+  private boolean innerRename(Path source, Path dest)
       throws RenameFailedException, FileNotFoundException, IOException,
         AmazonClientException {
+    Path src = qualify(source);
+    Path dst = qualify(dest);
+
     LOG.debug("Rename path {} to {}", src, dst);
     incrementStatistic(INVOCATION_RENAME);
 
@@ -734,7 +775,7 @@ public class S3AFileSystem extends FileSystem {
 
     // get the source file status; this raises a FNFE if there is no source
     // file.
-    S3AFileStatus srcStatus = getFileStatus(src);
+    S3AFileStatus srcStatus = innerGetFileStatus(src, true);
 
     if (srcKey.equals(dstKey)) {
       LOG.debug("rename: src and dest refer to the same file or directory: {}",
@@ -746,7 +787,7 @@ public class S3AFileSystem extends FileSystem {
 
     S3AFileStatus dstStatus = null;
     try {
-      dstStatus = getFileStatus(dst);
+      dstStatus = innerGetFileStatus(dst, true);
       // if there is no destination entry, an exception is raised.
       // hence this code sequence can assume that there is something
       // at the end of the path; the only detail being what it is and
@@ -756,7 +797,7 @@ public class S3AFileSystem extends FileSystem {
           throw new RenameFailedException(src, dst,
               "source is a directory and dest is a file")
               .withExitCode(srcStatus.isFile());
-        } else if (!dstStatus.isEmptyDirectory()) {
+        } else if (dstStatus.isEmptyDirectory() != Tristate.TRUE) {
           throw new RenameFailedException(src, dst,
               "Destination is a non-empty directory")
               .withExitCode(false);
@@ -778,7 +819,8 @@ public class S3AFileSystem extends FileSystem {
       Path parent = dst.getParent();
       if (!pathToKey(parent).isEmpty()) {
         try {
-          S3AFileStatus dstParentStatus = getFileStatus(dst.getParent());
+          S3AFileStatus dstParentStatus = innerGetFileStatus(dst.getParent(),
+              false);
           if (!dstParentStatus.isDirectory()) {
             throw new RenameFailedException(src, dst,
                 "destination parent is not a directory");
@@ -790,9 +832,20 @@ public class S3AFileSystem extends FileSystem {
       }
     }
 
+    // If we have a MetadataStore, track deletions/creations.
+    Collection<Path> srcPaths = null;
+    List<PathMetadata> dstMetas = null;
+    if (hasMetadataStore()) {
+      srcPaths = new HashSet<>(); // srcPaths need fast look up before put
+      dstMetas = new ArrayList<>();
+    }
+    // TODO S3Guard HADOOP-13761: retries when source paths are not visible yet
+    // TODO S3Guard: performance: mark destination dirs as authoritative
+
     // Ok! Time to start
     if (srcStatus.isFile()) {
       LOG.debug("rename: renaming file {} to {}", src, dst);
+      long length = srcStatus.getLen();
       if (dstStatus != null && dstStatus.isDirectory()) {
         String newDstKey = dstKey;
         if (!newDstKey.endsWith("/")) {
@@ -801,9 +854,14 @@ public class S3AFileSystem extends FileSystem {
         String filename =
             srcKey.substring(pathToKey(src.getParent()).length()+1);
         newDstKey = newDstKey + filename;
-        copyFile(srcKey, newDstKey, srcStatus.getLen());
+        copyFile(srcKey, newDstKey, length);
+        S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src,
+            keyToQualifiedPath(newDstKey), length, getDefaultBlockSize(dst),
+            username);
       } else {
         copyFile(srcKey, dstKey, srcStatus.getLen());
+        S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src, dst,
+            length, getDefaultBlockSize(dst), username);
       }
       innerDelete(srcStatus, false);
     } else {
@@ -825,42 +883,66 @@ public class S3AFileSystem extends FileSystem {
       }
 
       List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
-      if (dstStatus != null && dstStatus.isEmptyDirectory()) {
+      if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) {
         // delete unnecessary fake directory.
         keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
       }
 
-      ListObjectsRequest request = new ListObjectsRequest();
-      request.setBucketName(bucket);
-      request.setPrefix(srcKey);
-      request.setMaxKeys(maxKeys);
-
-      ObjectListing objects = listObjects(request);
-
-      while (true) {
-        for (S3ObjectSummary summary : objects.getObjectSummaries()) {
-          keysToDelete.add(
-              new DeleteObjectsRequest.KeyVersion(summary.getKey()));
-          String newDstKey =
-              dstKey + summary.getKey().substring(srcKey.length());
-          copyFile(summary.getKey(), newDstKey, summary.getSize());
-
-          if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
-            removeKeys(keysToDelete, true, false);
+      Path parentPath = keyToPath(srcKey);
+      RemoteIterator<LocatedFileStatus> iterator = listFilesAndEmptyDirectories(
+          parentPath, true);
+      while (iterator.hasNext()) {
+        LocatedFileStatus status = iterator.next();
+        long length = status.getLen();
+        String key = pathToKey(status.getPath());
+        if (status.isDirectory() && !key.endsWith("/")) {
+          key += "/";
+        }
+        keysToDelete
+            .add(new DeleteObjectsRequest.KeyVersion(key));
+        String newDstKey =
+            dstKey + key.substring(srcKey.length());
+        copyFile(key, newDstKey, length);
+
+        if (hasMetadataStore()) {
+          // with a metadata store, the object entries need to be updated,
+          // including, potentially, the ancestors
+          Path childSrc = keyToQualifiedPath(key);
+          Path childDst = keyToQualifiedPath(newDstKey);
+          if (objectRepresentsDirectory(key, length)) {
+            S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc,
+                childDst, username);
+          } else {
+            S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc,
+                childDst, length, getDefaultBlockSize(childDst), username);
           }
+          // Ancestor directories may not be listed, so we explicitly add them
+          S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas,
+              keyToQualifiedPath(srcKey), childSrc, childDst, username);
         }
 
-        if (objects.isTruncated()) {
-          objects = continueListObjects(objects);
-        } else {
-          if (!keysToDelete.isEmpty()) {
-            removeKeys(keysToDelete, false, false);
-          }
-          break;
+        if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
+          removeKeys(keysToDelete, true, false);
         }
       }
+      if (!keysToDelete.isEmpty()) {
+        removeKeys(keysToDelete, false, false);
+      }
+
+      // We moved all the children, now move the top-level dir
+      // Empty directory should have been added as the object summary
+      if (hasMetadataStore()
+          && srcPaths != null
+          && !srcPaths.contains(src)) {
+        LOG.debug("To move the non-empty top-level dir src={} and dst={}",
+            src, dst);
+        S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, src, dst,
+            username);
+      }
     }
 
+    metadataStore.move(srcPaths, dstMetas);
+
     if (src.getParent() != dst.getParent()) {
       deleteUnnecessaryFakeDirectories(dst.getParent());
       createFakeDirectoryIfNecessary(src.getParent());
@@ -880,6 +962,31 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Does this Filesystem have a metadata store?
+   * @return true iff the FS has been instantiated with a metadata store
+   */
+  public boolean hasMetadataStore() {
+    return !S3Guard.isNullMetadataStore(metadataStore);
+  }
+
+  /**
+   * Get the metadata store.
+   * This will always be non-null, but may be bound to the
+   * {@code NullMetadataStore}.
+   * @return the metadata store of this FS instance
+   */
+  @VisibleForTesting
+  MetadataStore getMetadataStore() {
+    return metadataStore;
+  }
+
+  /** For testing only.  See ITestS3GuardEmptyDirs. */
+  @VisibleForTesting
+  void setMetadataStore(MetadataStore ms) {
+    metadataStore = ms;
+  }
+
+  /**
    * Increment a statistic by 1.
    * @param statistic The operation to increment
    */
@@ -1063,8 +1170,9 @@ public class S3AFileSystem extends FileSystem {
    * @param inputStream source data.
    * @return the request
    */
-  private PutObjectRequest newPutObjectRequest(String key,
-      ObjectMetadata metadata, InputStream inputStream) {
+  PutObjectRequest newPutObjectRequest(String key,
+      ObjectMetadata metadata,
+      InputStream inputStream) {
     Preconditions.checkNotNull(inputStream);
     PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
         inputStream, metadata);
@@ -1115,7 +1223,7 @@ public class S3AFileSystem extends FileSystem {
    * @param putObjectRequest the request
    * @return the upload initiated
    */
-  public Upload putObject(PutObjectRequest putObjectRequest) {
+  public UploadInfo putObject(PutObjectRequest putObjectRequest) {
     long len;
     if (putObjectRequest.getFile() != null) {
       len = putObjectRequest.getFile().length();
@@ -1126,7 +1234,7 @@ public class S3AFileSystem extends FileSystem {
     try {
       Upload upload = transfers.upload(putObjectRequest);
       incrementPutCompletedStatistics(true, len);
-      return upload;
+      return new UploadInfo(upload, len);
     } catch (AmazonClientException e) {
       incrementPutCompletedStatistics(false, len);
       throw e;
@@ -1142,14 +1250,10 @@ public class S3AFileSystem extends FileSystem {
    * @return the upload initiated
    * @throws AmazonClientException on problems
    */
-  public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
+  PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
       throws AmazonClientException {
-    long len;
-    if (putObjectRequest.getFile() != null) {
-      len = putObjectRequest.getFile().length();
-    } else {
-      len = putObjectRequest.getMetadata().getContentLength();
-    }
+    long len = getPutRequestLength(putObjectRequest);
+    LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey());
     incrementPutStartStatistics(len);
     try {
       PutObjectResult result = s3.putObject(putObjectRequest);
@@ -1162,6 +1266,23 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Get the length of the PUT, verifying that the length is known.
+   * @param putObjectRequest a request bound to a file or a stream.
+   * @return the request length
+   * @throws IllegalArgumentException if the length is negative
+   */
+  private long getPutRequestLength(PutObjectRequest putObjectRequest) {
+    long len;
+    if (putObjectRequest.getFile() != null) {
+      len = putObjectRequest.getFile().length();
+    } else {
+      len = putObjectRequest.getMetadata().getContentLength();
+    }
+    Preconditions.checkState(len >= 0, "Cannot PUT object of unknown length");
+    return len;
+  }
+
+  /**
    * Upload part of a multi-partition file.
    * Increments the write and put counters.
    * <i>Important: this call does not close any input stream in the request.</i>
@@ -1288,7 +1409,7 @@ public class S3AFileSystem extends FileSystem {
    */
   public boolean delete(Path f, boolean recursive) throws IOException {
     try {
-      return innerDelete(getFileStatus(f), recursive);
+      return innerDelete(innerGetFileStatus(f, true), recursive);
     } catch (FileNotFoundException e) {
       LOG.debug("Couldn't delete {} - does not exist", f);
       instrumentation.errorIgnored();
@@ -1318,6 +1439,9 @@ public class S3AFileSystem extends FileSystem {
 
     if (status.isDirectory()) {
       LOG.debug("delete: Path is a directory: {}", f);
+      Preconditions.checkArgument(
+          status.isEmptyDirectory() != Tristate.UNKNOWN,
+          "File status must have directory emptiness computed");
 
       if (!key.endsWith("/")) {
         key = key + "/";
@@ -1327,13 +1451,15 @@ public class S3AFileSystem extends FileSystem {
         return rejectRootDirectoryDelete(status, recursive);
       }
 
-      if (!recursive && !status.isEmptyDirectory()) {
+      if (!recursive && status.isEmptyDirectory() == Tristate.FALSE) {
         throw new PathIsNotEmptyDirectoryException(f.toString());
       }
 
-      if (status.isEmptyDirectory()) {
+      if (status.isEmptyDirectory() == Tristate.TRUE) {
         LOG.debug("Deleting fake empty directory {}", key);
+        // HADOOP-13761 S3Guard: retries here
         deleteObject(key);
+        metadataStore.delete(f);
         instrumentation.directoryDeleted();
       } else {
         LOG.debug("Getting objects for directory prefix {} to delete", key);
@@ -1349,6 +1475,7 @@ public class S3AFileSystem extends FileSystem {
             LOG.debug("Got object to delete {}", summary.getKey());
 
             if (keys.size() == MAX_ENTRIES_TO_DELETE) {
+              // TODO: HADOOP-13761 S3Guard: retries
               removeKeys(keys, true, false);
             }
           }
@@ -1357,16 +1484,19 @@ public class S3AFileSystem extends FileSystem {
             objects = continueListObjects(objects);
           } else {
             if (!keys.isEmpty()) {
+              // TODO: HADOOP-13761 S3Guard: retries
               removeKeys(keys, false, false);
             }
             break;
           }
         }
       }
+      metadataStore.deleteSubtree(f);
     } else {
       LOG.debug("delete: Path is a file");
       instrumentation.fileDeleted(1);
       deleteObject(key);
+      metadataStore.delete(f);
     }
 
     Path parent = f.getParent();
@@ -1390,7 +1520,7 @@ public class S3AFileSystem extends FileSystem {
   private boolean rejectRootDirectoryDelete(S3AFileStatus status,
       boolean recursive) throws IOException {
     LOG.info("s3a delete the {} root directory of {}", bucket, recursive);
-    boolean emptyRoot = status.isEmptyDirectory();
+    boolean emptyRoot = status.isEmptyDirectory() == Tristate.TRUE;
     if (emptyRoot) {
       return true;
     }
@@ -1405,7 +1535,7 @@ public class S3AFileSystem extends FileSystem {
   private void createFakeDirectoryIfNecessary(Path f)
       throws IOException, AmazonClientException {
     String key = pathToKey(f);
-    if (!key.isEmpty() && !exists(f)) {
+    if (!key.isEmpty() && !s3Exists(f)) {
       LOG.debug("Creating new fake directory at {}", f);
       createFakeDirectory(key);
     }
@@ -1454,6 +1584,11 @@ public class S3AFileSystem extends FileSystem {
         key = key + '/';
       }
 
+      DirListingMetadata dirMeta = metadataStore.listChildren(path);
+      if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
+        return S3Guard.dirMetaToStatuses(dirMeta);
+      }
+
       ListObjectsRequest request = createListObjectsRequest(key, "/");
       LOG.debug("listStatus: doing listObjects for directory {}", key);
 
@@ -1466,7 +1601,8 @@ public class S3AFileSystem extends FileSystem {
       while (files.hasNext()) {
         result.add(files.next());
       }
-      return result.toArray(new FileStatus[result.size()]);
+      return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
+          allowAuthoritative);
     } else {
       LOG.debug("Adding: rd (not a dir): {}", path);
       FileStatus[] stats = new FileStatus[1];
@@ -1482,7 +1618,8 @@ public class S3AFileSystem extends FileSystem {
    * @param delimiter any delimiter
    * @return the request
    */
-  private ListObjectsRequest createListObjectsRequest(String key,
+  @VisibleForTesting
+  ListObjectsRequest createListObjectsRequest(String key,
       String delimiter) {
     ListObjectsRequest request = new ListObjectsRequest();
     request.setBucketName(bucket);
@@ -1541,23 +1678,30 @@ public class S3AFileSystem extends FileSystem {
       throw translateException("innerMkdirs", path, e);
     }
   }
+
   /**
    *
    * Make the given path and all non-existent parents into
    * directories.
    * See {@link #mkdirs(Path, FsPermission)}
-   * @param f path to create
+   * @param p path to create
    * @param permission to apply to f
-   * @return true if a directory was created
+   * @return true if a directory was created or already existed
    * @throws FileAlreadyExistsException there is a file at the path specified
    * @throws IOException other IO problems
    * @throws AmazonClientException on failures inside the AWS SDK
    */
-  private boolean innerMkdirs(Path f, FsPermission permission)
+  private boolean innerMkdirs(Path p, FsPermission permission)
       throws IOException, FileAlreadyExistsException, AmazonClientException {
+    Path f = qualify(p);
     LOG.debug("Making directory: {}", f);
     incrementStatistic(INVOCATION_MKDIRS);
     FileStatus fileStatus;
+    List<Path> metadataStoreDirs = null;
+    if (hasMetadataStore()) {
+      metadataStoreDirs = new ArrayList<>();
+    }
+
     try {
       fileStatus = getFileStatus(f);
 
@@ -1567,8 +1711,12 @@ public class S3AFileSystem extends FileSystem {
         throw new FileAlreadyExistsException("Path is a file: " + f);
       }
     } catch (FileNotFoundException e) {
+      // Walk path to root, ensuring closest ancestor is a directory, not file
       Path fPart = f.getParent();
-      do {
+      if (metadataStoreDirs != null) {
+        metadataStoreDirs.add(f);
+      }
+      while (fPart != null) {
         try {
           fileStatus = getFileStatus(fPart);
           if (fileStatus.isDirectory()) {
@@ -1581,12 +1729,17 @@ public class S3AFileSystem extends FileSystem {
           }
         } catch (FileNotFoundException fnfe) {
           instrumentation.errorIgnored();
+          // We create all missing directories in MetadataStore; it does not
+          // infer directories exist by prefix like S3.
+          if (metadataStoreDirs != null) {
+            metadataStoreDirs.add(fPart);
+          }
         }
         fPart = fPart.getParent();
-      } while (fPart != null);
-
+      }
       String key = pathToKey(f);
       createFakeDirectory(key);
+      S3Guard.makeDirsOrdered(metadataStore, metadataStoreDirs, username, true);
       // this is complicated because getParent(a/b/c/) returns a/b/c, but
       // we want a/b. See HADOOP-14428 for more details.
       deleteUnnecessaryFakeDirectories(new Path(f.toString()).getParent());
@@ -1598,21 +1751,93 @@ public class S3AFileSystem extends FileSystem {
    * Return a file status object that represents the path.
    * @param f The path we want information from
    * @return a FileStatus object
-   * @throws java.io.FileNotFoundException when the path does not exist;
+   * @throws FileNotFoundException when the path does not exist
    * @throws IOException on other problems.
    */
-  public S3AFileStatus getFileStatus(final Path f) throws IOException {
+  public FileStatus getFileStatus(final Path f) throws IOException {
+    return innerGetFileStatus(f, false);
+  }
+
+  /**
+   * Internal version of {@link #getFileStatus(Path)}.
+   * @param f The path we want information from
+   * @param needEmptyDirectoryFlag if true, implementation will calculate
+   *        a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
+   * @return a S3AFileStatus object
+   * @throws FileNotFoundException when the path does not exist
+   * @throws IOException on other problems.
+   */
+  @VisibleForTesting
+  S3AFileStatus innerGetFileStatus(final Path f,
+      boolean needEmptyDirectoryFlag) throws IOException {
     incrementStatistic(INVOCATION_GET_FILE_STATUS);
     final Path path = qualify(f);
     String key = pathToKey(path);
-    LOG.debug("Getting path status for {}  ({})", path , key);
+    LOG.debug("Getting path status for {}  ({})", path, key);
+
+    // Check MetadataStore, if any.
+    PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag);
+    Set<Path> tombstones = Collections.EMPTY_SET;
+    if (pm != null) {
+      if (pm.isDeleted()) {
+        throw new FileNotFoundException("Path " + f + " is recorded as " +
+            "deleted by S3Guard");
+      }
+
+      FileStatus msStatus = pm.getFileStatus();
+      if (needEmptyDirectoryFlag && msStatus.isDirectory()) {
+        if (pm.isEmptyDirectory() != Tristate.UNKNOWN) {
+          // We have a definitive true / false from MetadataStore, we are done.
+          return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
+        } else {
+          DirListingMetadata children = metadataStore.listChildren(path);
+          if (children != null) {
+            tombstones = children.listTombstones();
+          }
+          LOG.debug("MetadataStore doesn't know if dir is empty, using S3.");
+        }
+      } else {
+        // Either this is not a directory, or we don't care if it is empty
+        return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
+      }
+
+      // If the metadata store has no children for it and it's not listed in
+      // S3 yet, we'll assume the empty directory is true;
+      S3AFileStatus s3FileStatus;
+      try {
+        s3FileStatus = s3GetFileStatus(path, key, tombstones);
+      } catch (FileNotFoundException e) {
+        return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE);
+      }
+      // entry was found, save in S3Guard
+      return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
+    } else {
+      // there was no entry in S3Guard
+      // retrieve the data and update the metadata store in the process.
+      return S3Guard.putAndReturn(metadataStore,
+          s3GetFileStatus(path, key, tombstones), instrumentation);
+    }
+  }
+
+  /**
+   * Raw {@code getFileStatus} that talks direct to S3.
+   * Used to implement {@link #innerGetFileStatus(Path, boolean)},
+   * and for direct management of empty directory blobs.
+   * @param path Qualified path
+   * @param key  Key string for the path
+   * @return Status
+   * @throws FileNotFoundException when the path does not exist
+   * @throws IOException on other problems.
+   */
+  private S3AFileStatus s3GetFileStatus(final Path path, String key,
+      Set<Path> tombstones) throws IOException {
     if (!key.isEmpty()) {
       try {
         ObjectMetadata meta = getObjectMetadata(key);
 
         if (objectRepresentsDirectory(key, meta.getContentLength())) {
           LOG.debug("Found exact file: fake directory");
-          return new S3AFileStatus(true, path, username);
+          return new S3AFileStatus(Tristate.TRUE, path, username);
         } else {
           LOG.debug("Found exact file: normal file");
           return new S3AFileStatus(meta.getContentLength(),
@@ -1637,16 +1862,16 @@ public class S3AFileSystem extends FileSystem {
 
           if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
             LOG.debug("Found file (with /): fake directory");
-            return new S3AFileStatus(true, path, username);
+            return new S3AFileStatus(Tristate.TRUE, path, username);
           } else {
             LOG.warn("Found file (with /): real file? should not happen: {}",
                 key);
 
             return new S3AFileStatus(meta.getContentLength(),
-                dateToLong(meta.getLastModified()),
-                path,
-                getDefaultBlockSize(path),
-                username);
+                    dateToLong(meta.getLastModified()),
+                    path,
+                    getDefaultBlockSize(path),
+                    username);
           }
         } catch (AmazonServiceException e) {
           if (e.getStatusCode() != 404) {
@@ -1668,25 +1893,26 @@ public class S3AFileSystem extends FileSystem {
 
       ObjectListing objects = listObjects(request);
 
-      if (!objects.getCommonPrefixes().isEmpty()
-          || !objects.getObjectSummaries().isEmpty()) {
+      Collection<String> prefixes = objects.getCommonPrefixes();
+      Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
+      if (!isEmptyOfKeys(prefixes, tombstones) ||
+          !isEmptyOfObjects(summaries, tombstones)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Found path as directory (with /): {}/{}",
-              objects.getCommonPrefixes().size() ,
-              objects.getObjectSummaries().size());
+              prefixes.size(), summaries.size());
 
-          for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+          for (S3ObjectSummary summary : summaries) {
             LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
           }
-          for (String prefix : objects.getCommonPrefixes()) {
+          for (String prefix : prefixes) {
             LOG.debug("Prefix: {}", prefix);
           }
         }
 
-        return new S3AFileStatus(false, path, username);
+        return new S3AFileStatus(Tristate.FALSE, path, username);
       } else if (key.isEmpty()) {
         LOG.debug("Found root directory");
-        return new S3AFileStatus(true, path, username);
+        return new S3AFileStatus(Tristate.TRUE, path, username);
       }
     } catch (AmazonServiceException e) {
       if (e.getStatusCode() != 404) {
@@ -1701,6 +1927,64 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Helper function to determine if a collection of paths is empty
+   * after accounting for tombstone markers (if provided).
+   * @param keys Collection of path (prefixes / directories or keys).
+   * @param tombstones Set of tombstone markers, or null if not applicable.
+   * @return false if summaries contains objects not accounted for by
+   * tombstones.
+   */
+  private boolean isEmptyOfKeys(Collection<String> keys, Set<Path>
+      tombstones) {
+    if (tombstones == null) {
+      return keys.isEmpty();
+    }
+    for (String key : keys) {
+      Path qualified = keyToQualifiedPath(key);
+      if (!tombstones.contains(qualified)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Helper function to determine if a collection of object summaries is empty
+   * after accounting for tombstone markers (if provided).
+   * @param summaries Collection of objects as returned by listObjects.
+   * @param tombstones Set of tombstone markers, or null if not applicable.
+   * @return false if summaries contains objects not accounted for by
+   * tombstones.
+   */
+  private boolean isEmptyOfObjects(Collection<S3ObjectSummary> summaries,
+      Set<Path> tombstones) {
+    if (tombstones == null) {
+      return summaries.isEmpty();
+    }
+    Collection<String> stringCollection = new ArrayList<>(summaries.size());
+    for (S3ObjectSummary summary : summaries) {
+      stringCollection.add(summary.getKey());
+    }
+    return isEmptyOfKeys(stringCollection, tombstones);
+  }
+
+  /**
+   * Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
+   * S3Guard MetadataStore, if any, will be skipped.
+   * @return true if path exists in S3
+   */
+  private boolean s3Exists(final Path f) throws IOException {
+    Path path = qualify(f);
+    String key = pathToKey(path);
+    try {
+      s3GetFileStatus(path, key, null);
+      return true;
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
+
+  /**
    * The src file is on the local disk.  Add it to FS at
    * the given dst name.
    *
@@ -1777,12 +2061,13 @@ public class S3AFileSystem extends FileSystem {
     final String key = pathToKey(dst);
     final ObjectMetadata om = newObjectMetadata(srcfile.length());
     PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
-    Upload up = putObject(putObjectRequest);
+    UploadInfo info = putObject(putObjectRequest);
+    Upload upload = info.getUpload();
     ProgressableProgressListener listener = new ProgressableProgressListener(
-        this, key, up, null);
-    up.addProgressListener(listener);
+        this, key, upload, null);
+    upload.addProgressListener(listener);
     try {
-      up.waitForUploadResult();
+      upload.waitForUploadResult();
     } catch (InterruptedException e) {
       throw new InterruptedIOException("Interrupted copying " + src
           + " to "  + dst + ", cancelling");
@@ -1790,7 +2075,7 @@ public class S3AFileSystem extends FileSystem {
     listener.uploadCompleted();
 
     // This will delete unnecessary fake parent directories
-    finishedWrite(key);
+    finishedWrite(key, info.getLength());
 
     if (delSrc) {
       local.delete(src, false);
@@ -1814,6 +2099,10 @@ public class S3AFileSystem extends FileSystem {
         transfers.shutdownNow(true);
         transfers = null;
       }
+      if (metadataStore != null) {
+        metadataStore.close();
+        metadataStore = null;
+      }
     }
   }
 
@@ -1956,11 +2245,38 @@ public class S3AFileSystem extends FileSystem {
 
   /**
    * Perform post-write actions.
+   * This operation MUST be called after any PUT/multipart PUT completes
+   * successfully.
+   * This includes
+   * <ol>
+   *   <li>Calling {@link #deleteUnnecessaryFakeDirectories(Path)}</li>
+   *   <li>Updating any metadata store with details on the newly created
+   *   object.</li>
+   * </ol>
    * @param key key written to
+   * @param length  total length of file written
    */
-  public void finishedWrite(String key) {
-    LOG.debug("Finished write to {}", key);
-    deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
+  @InterfaceAudience.Private
+  void finishedWrite(String key, long length) {
+    LOG.debug("Finished write to {}, len {}", key, length);
+    Path p = keyToQualifiedPath(key);
+    deleteUnnecessaryFakeDirectories(p.getParent());
+    Preconditions.checkArgument(length >= 0, "content length is negative");
+
+    // See note about failure semantics in S3Guard documentation
+    try {
+      if (hasMetadataStore()) {
+        S3Guard.addAncestors(metadataStore, p, username);
+        S3AFileStatus status = createUploadFileStatus(p,
+            S3AUtils.objectRepresentsDirectory(key, length), length,
+            getDefaultBlockSize(p), username);
+        S3Guard.putAndReturn(metadataStore, status, instrumentation);
+      }
+    } catch (IOException e) {
+      LOG.error("S3Guard: Error updating MetadataStore for write to {}:",
+          key, e);
+      instrumentation.errorIgnored();
+    }
   }
 
   /**
@@ -2015,9 +2331,9 @@ public class S3AFileSystem extends FileSystem {
     PutObjectRequest putObjectRequest = newPutObjectRequest(objectName,
         newObjectMetadata(0L),
         im);
-    Upload upload = putObject(putObjectRequest);
+    UploadInfo info = putObject(putObjectRequest);
     try {
-      upload.waitForUploadResult();
+      info.getUpload().waitForUploadResult();
     } catch (InterruptedException e) {
       throw new InterruptedIOException("Interrupted creating " + objectName);
     }
@@ -2123,6 +2439,8 @@ public class S3AFileSystem extends FileSystem {
     if (blockFactory != null) {
       sb.append(", blockFactory=").append(blockFactory);
     }
+    sb.append(", metastore=").append(metadataStore);
+    sb.append(", authoritative=").append(allowAuthoritative);
     sb.append(", boundedExecutor=").append(boundedThreadPool);
     sb.append(", unboundedExecutor=").append(unboundedThreadPool);
     sb.append(", statistics {")
@@ -2241,6 +2559,18 @@ public class S3AFileSystem extends FileSystem {
   @Override
   public RemoteIterator<LocatedFileStatus> listFiles(Path f,
       boolean recursive) throws FileNotFoundException, IOException {
+    return innerListFiles(f, recursive,
+        new Listing.AcceptFilesOnly(qualify(f)));
+  }
+
+  public RemoteIterator<LocatedFileStatus> listFilesAndEmptyDirectories(Path f,
+      boolean recursive) throws IOException {
+    return innerListFiles(f, recursive,
+        new Listing.AcceptAllButS3nDirs());
+  }
+
+  private RemoteIterator<LocatedFileStatus> innerListFiles(Path f, boolean
+      recursive, Listing.FileStatusAcceptor acceptor) throws IOException {
     incrementStatistic(INVOCATION_LIST_FILES);
     Path path = qualify(f);
     LOG.debug("listFiles({}, {})", path, recursive);
@@ -2258,13 +2588,42 @@ public class S3AFileSystem extends FileSystem {
         String delimiter = recursive ? null : "/";
         LOG.debug("Requesting all entries under {} with delimiter '{}'",
             key, delimiter);
-        return listing.createLocatedFileStatusIterator(
-            listing.createFileStatusListingIterator(path,
-                createListObjectsRequest(key, delimiter),
-                ACCEPT_ALL,
-                new Listing.AcceptFilesOnly(path)));
+        final RemoteIterator<FileStatus> cachedFilesIterator;
+        final Set<Path> tombstones;
+        if (recursive) {
+          final PathMetadata pm = metadataStore.get(path, true);
+          // shouldn't need to check pm.isDeleted() because that will have
+          // been caught by getFileStatus above.
+          MetadataStoreListFilesIterator metadataStoreListFilesIterator =
+              new MetadataStoreListFilesIterator(metadataStore, pm,
+                  allowAuthoritative);
+          tombstones = metadataStoreListFilesIterator.listTombstones();
+          cachedFilesIterator = metadataStoreListFilesIterator;
+        } else {
+          DirListingMetadata meta = metadataStore.listChildren(path);
+          if (meta != null) {
+            tombstones = meta.listTombstones();
+          } else {
+            tombstones = null;
+          }
+          cachedFilesIterator = listing.createProvidedFileStatusIterator(
+              S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
+          if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
+            // metadata listing is authoritative, so return it directly
+            return listing.createLocatedFileStatusIterator(cachedFilesIterator);
+          }
+        }
+        return listing.createTombstoneReconcilingIterator(
+            listing.createLocatedFileStatusIterator(
+                listing.createFileStatusListingIterator(path,
+                    createListObjectsRequest(key, delimiter),
+                    ACCEPT_ALL,
+                    acceptor,
+                    cachedFilesIterator)),
+            tombstones);
       }
     } catch (AmazonClientException e) {
+      // TODO S3Guard: retry on file not found exception
       throw translateException("listFiles", path, e);
     }
   }
@@ -2309,12 +2668,21 @@ public class S3AFileSystem extends FileSystem {
             filter.accept(path) ? toLocatedFileStatus(fileStatus) : null);
       } else {
         // directory: trigger a lookup
-        String key = maybeAddTrailingSlash(pathToKey(path));
-        return listing.createLocatedFileStatusIterator(
-            listing.createFileStatusListingIterator(path,
-                createListObjectsRequest(key, "/"),
-                filter,
-                new Listing.AcceptAllButSelfAndS3nDirs(path)));
+        final String key = maybeAddTrailingSlash(pathToKey(path));
+        final Listing.FileStatusAcceptor acceptor =
+            new Listing.AcceptAllButSelfAndS3nDirs(path);
+        DirListingMetadata meta = metadataStore.listChildren(path);
+        final RemoteIterator<FileStatus> cachedFileStatusIterator =
+            listing.createProvidedFileStatusIterator(
+                S3Guard.dirMetaToStatuses(meta), filter, acceptor);
+        return (allowAuthoritative && meta != null && meta.isAuthoritative())
+            ? listing.createLocatedFileStatusIterator(cachedFileStatusIterator)
+            : listing.createLocatedFileStatusIterator(
+                listing.createFileStatusListingIterator(path,
+                    createListObjectsRequest(key, "/"),
+                    filter,
+                    acceptor,
+                    cachedFileStatusIterator));
       }
     } catch (AmazonClientException e) {
       throw translateException("listLocatedStatus", path, e);
@@ -2389,8 +2757,8 @@ public class S3AFileSystem extends FileSystem {
     /**
      * Callback on a successful write.
      */
-    void writeSuccessful() {
-      finishedWrite(key);
+    void writeSuccessful(long length) {
+      finishedWrite(key, length);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index d2e7a88..da1fc5a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.metrics2.MetricStringBuilder;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.Interns;
@@ -30,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableMetric;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 
 import java.io.Closeable;
 import java.net.URI;
@@ -38,7 +40,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.fs.FileSystem.Statistics;
 
 import static org.apache.hadoop.fs.s3a.Statistic.*;
 
@@ -90,6 +91,10 @@ public class S3AInstrumentation {
   private final Map<String, MutableCounterLong> streamMetrics =
       new HashMap<>(30);
 
+  /** Instantiate this without caring whether or not S3Guard is enabled. */
+  private final S3GuardInstrumentation s3GuardInstrumentation
+      = new S3GuardInstrumentation();
+
   private static final Statistic[] COUNTERS_TO_CREATE = {
       INVOCATION_COPY_FROM_LOCAL_FILE,
       INVOCATION_EXISTS,
@@ -117,6 +122,8 @@ public class S3AInstrumentation {
       STREAM_WRITE_BLOCK_UPLOADS_ABORTED,
       STREAM_WRITE_TOTAL_TIME,
       STREAM_WRITE_TOTAL_DATA,
+      S3GUARD_METADATASTORE_PUT_PATH_REQUEST,
+      S3GUARD_METADATASTORE_INITIALIZATION
   };
 
 
@@ -171,6 +178,9 @@ public class S3AInstrumentation {
     for (Statistic statistic : GAUGES_TO_CREATE) {
       gauge(statistic.getSymbol(), statistic.getDescription());
     }
+    //todo need a config for the quantiles interval?
+    quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
+        "ops", "latency", 1);
   }
 
   /**
@@ -227,6 +237,22 @@ public class S3AInstrumentation {
   }
 
   /**
+   * Create a quantiles in the registry.
+   * @param op  statistic to collect
+   * @param sampleName sample name of the quantiles
+   * @param valueName value name of the quantiles
+   * @param interval interval of the quantiles in seconds
+   * @return the created quantiles metric
+   */
+  protected final MutableQuantiles quantiles(Statistic op,
+      String sampleName,
+      String valueName,
+      int interval) {
+    return registry.newQuantiles(op.getSymbol(), op.getDescription(),
+        sampleName, valueName, interval);
+  }
+
+  /**
    * Get the metrics registry.
    * @return the registry
    */
@@ -311,6 +337,20 @@ public class S3AInstrumentation {
   }
 
   /**
+   * Look up a quantiles.
+   * @param name quantiles name
+   * @return the quantiles or null
+   * @throws ClassCastException if the metric is not a Quantiles.
+   */
+  public MutableQuantiles lookupQuantiles(String name) {
+    MutableMetric metric = lookupMetric(name);
+    if (metric == null) {
+      LOG.debug("No quantiles {}", name);
+    }
+    return (MutableQuantiles) metric;
+  }
+
+  /**
    * Look up a metric from both the registered set and the lighter weight
    * stream entries.
    * @param name metric name
@@ -391,6 +431,21 @@ public class S3AInstrumentation {
       counter.incr(count);
     }
   }
+
+  /**
+   * Add a value to a quantiles statistic. No-op if the quantile
+   * isn't found.
+   * @param op operation to look up.
+   * @param value value to add.
+   * @throws ClassCastException if the metric is not a Quantiles.
+   */
+  public void addValueToQuantiles(Statistic op, long value) {
+    MutableQuantiles quantiles = lookupQuantiles(op.getSymbol());
+    if (quantiles != null) {
+      quantiles.add(value);
+    }
+  }
+
   /**
    * Increment a specific counter.
    * No-op if not defined.
@@ -442,6 +497,15 @@ public class S3AInstrumentation {
   }
 
   /**
+   * Create a S3Guard instrumentation instance.
+   * There's likely to be at most one instance of this per FS instance.
+   * @return the S3Guard instrumentation point.
+   */
+  public S3GuardInstrumentation getS3GuardInstrumentation() {
+    return s3GuardInstrumentation;
+  }
+
+  /**
    * Merge in the statistics of a single input stream into
    * the filesystem-wide statistics.
    * @param statistics stream statistics
@@ -840,4 +904,19 @@ public class S3AInstrumentation {
       return sb.toString();
     }
   }
+
+  /**
+   * Instrumentation exported to S3Guard.
+   */
+  public final class S3GuardInstrumentation {
+
+    /** Initialized event. */
+    public void initialized() {
+      incrementCounter(S3GUARD_METADATASTORE_INITIALIZATION, 1);
+    }
+
+    public void storeClosed() {
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
index 6ebc9e4..e723b75 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.transfer.Upload;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -101,19 +100,20 @@ public class S3AOutputStream extends OutputStream {
 
     try {
       final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
-      Upload upload = fs.putObject(
+      UploadInfo info = fs.putObject(
           fs.newPutObjectRequest(
               key,
               om,
               backupFile));
       ProgressableProgressListener listener =
-          new ProgressableProgressListener(fs, key, upload, progress);
-      upload.addProgressListener(listener);
+          new ProgressableProgressListener(fs, key, info.getUpload(), progress);
+      info.getUpload().addProgressListener(listener);
 
-      upload.waitForUploadResult();
+      info.getUpload().waitForUploadResult();
       listener.uploadCompleted();
-      // This will delete unnecessary fake parent directories
-      fs.finishedWrite(key);
+      // This will delete unnecessary fake parent directories, update any
+      // MetadataStore
+      fs.finishedWrite(key, info.getLength());
     } catch (InterruptedException e) {
       throw (InterruptedIOException) new InterruptedIOException(e.toString())
           .initCause(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 27406b6..9dd5def 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -294,12 +294,38 @@ public final class S3AUtils {
       S3ObjectSummary summary,
       long blockSize,
       String owner) {
-    if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
-      return new S3AFileStatus(true, keyPath, owner);
+    long size = summary.getSize();
+    return createFileStatus(keyPath,
+        objectRepresentsDirectory(summary.getKey(), size),
+        size, summary.getLastModified(), blockSize, owner);
+  }
+
+  /**
+   * Create a file status for object we just uploaded.  For files, we use
+   * current time as modification time, since s3a uses S3's service-based
+   * modification time, which will not be available until we do a
+   * getFileStatus() later on.
+   * @param keyPath path for created object
+   * @param isDir true iff directory
+   * @param size file length
+   * @param blockSize block size for file status
+   * @param owner Hadoop username
+   * @return a status entry
+   */
+  public static S3AFileStatus createUploadFileStatus(Path keyPath,
+      boolean isDir, long size, long blockSize, String owner) {
+    Date date = isDir ? null : new Date();
+    return createFileStatus(keyPath, isDir, size, date, blockSize, owner);
+  }
+
+  /* Date 'modified' is ignored when isDir is true. */
+  private static S3AFileStatus createFileStatus(Path keyPath, boolean isDir,
+      long size, Date modified, long blockSize, String owner) {
+    if (isDir) {
+      return new S3AFileStatus(Tristate.UNKNOWN, keyPath, owner);
     } else {
-      return new S3AFileStatus(summary.getSize(),
-          dateToLong(summary.getLastModified()), keyPath,
-          blockSize, owner);
+      return new S3AFileStatus(size, dateToLong(modified), keyPath, blockSize,
+          owner);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index d4e09e3..e7603d9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -18,33 +18,20 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
-
 import java.io.IOException;
 import java.net.URI;
 
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.S3ClientOptions;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.VersionInfo;
-
-import org.slf4j.Logger;
 
 /**
- * Factory for creation of S3 client instances to be used by {@link S3Store}.
+ * Factory for creation of {@link AmazonS3} client instances.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-interface S3ClientFactory {
+public interface S3ClientFactory {
 
   /**
    * Creates a new {@link AmazonS3} client.  This method accepts the S3A file
@@ -57,177 +44,4 @@ interface S3ClientFactory {
    */
   AmazonS3 createS3Client(URI name) throws IOException;
 
-  /**
-   * The default factory implementation, which calls the AWS SDK to configure
-   * and create an {@link AmazonS3Client} that communicates with the S3 service.
-   */
-  static class DefaultS3ClientFactory extends Configured
-      implements S3ClientFactory {
-
-    private static final Logger LOG = S3AFileSystem.LOG;
-
-    @Override
-    public AmazonS3 createS3Client(URI name) throws IOException {
-      Configuration conf = getConf();
-      AWSCredentialsProvider credentials =
-          createAWSCredentialProviderSet(name, conf);
-      ClientConfiguration awsConf = new ClientConfiguration();
-      initConnectionSettings(conf, awsConf);
-      initProxySupport(conf, awsConf);
-      initUserAgent(conf, awsConf);
-      return createAmazonS3Client(conf, credentials, awsConf);
-    }
-
-    /**
-     * Initializes all AWS SDK settings related to connection management.
-     *
-     * @param conf Hadoop configuration
-     * @param awsConf AWS SDK configuration
-     */
-    private static void initConnectionSettings(Configuration conf,
-        ClientConfiguration awsConf) {
-      awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
-          DEFAULT_MAXIMUM_CONNECTIONS, 1));
-      boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
-          DEFAULT_SECURE_CONNECTIONS);
-      awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
-      awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
-          DEFAULT_MAX_ERROR_RETRIES, 0));
-      awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
-          DEFAULT_ESTABLISH_TIMEOUT, 0));
-      awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
-          DEFAULT_SOCKET_TIMEOUT, 0));
-      int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
-          DEFAULT_SOCKET_SEND_BUFFER, 2048);
-      int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
-          DEFAULT_SOCKET_RECV_BUFFER, 2048);
-      awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
-      String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
-      if (!signerOverride.isEmpty()) {
-        LOG.debug("Signer override = {}", signerOverride);
-        awsConf.setSignerOverride(signerOverride);
-      }
-    }
-
-    /**
-     * Initializes AWS SDK proxy support if configured.
-     *
-     * @param conf Hadoop configuration
-     * @param awsConf AWS SDK configuration
-     * @throws IllegalArgumentException if misconfigured
-     */
-    private static void initProxySupport(Configuration conf,
-        ClientConfiguration awsConf) throws IllegalArgumentException {
-      String proxyHost = conf.getTrimmed(PROXY_HOST, "");
-      int proxyPort = conf.getInt(PROXY_PORT, -1);
-      if (!proxyHost.isEmpty()) {
-        awsConf.setProxyHost(proxyHost);
-        if (proxyPort >= 0) {
-          awsConf.setProxyPort(proxyPort);
-        } else {
-          if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
-            LOG.warn("Proxy host set without port. Using HTTPS default 443");
-            awsConf.setProxyPort(443);
-          } else {
-            LOG.warn("Proxy host set without port. Using HTTP default 80");
-            awsConf.setProxyPort(80);
-          }
-        }
-        String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
-        String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
-        if ((proxyUsername == null) != (proxyPassword == null)) {
-          String msg = "Proxy error: " + PROXY_USERNAME + " or " +
-              PROXY_PASSWORD + " set without the other.";
-          LOG.error(msg);
-          throw new IllegalArgumentException(msg);
-        }
-        awsConf.setProxyUsername(proxyUsername);
-        awsConf.setProxyPassword(proxyPassword);
-        awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
-        awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
-                  "domain {} as workstation {}", awsConf.getProxyHost(),
-              awsConf.getProxyPort(),
-              String.valueOf(awsConf.getProxyUsername()),
-              awsConf.getProxyPassword(), awsConf.getProxyDomain(),
-              awsConf.getProxyWorkstation());
-        }
-      } else if (proxyPort >= 0) {
-        String msg =
-            "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
-        LOG.error(msg);
-        throw new IllegalArgumentException(msg);
-      }
-    }
-
-    /**
-     * Initializes the User-Agent header to send in HTTP requests to the S3
-     * back-end.  We always include the Hadoop version number.  The user also
-     * may set an optional custom prefix to put in front of the Hadoop version
-     * number.  The AWS SDK interally appends its own information, which seems
-     * to include the AWS SDK version, OS and JVM version.
-     *
-     * @param conf Hadoop configuration
-     * @param awsConf AWS SDK configuration
-     */
-    private static void initUserAgent(Configuration conf,
-        ClientConfiguration awsConf) {
-      String userAgent = "Hadoop " + VersionInfo.getVersion();
-      String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
-      if (!userAgentPrefix.isEmpty()) {
-        userAgent = userAgentPrefix + ", " + userAgent;
-      }
-      LOG.debug("Using User-Agent: {}", userAgent);
-      awsConf.setUserAgentPrefix(userAgent);
-    }
-
-    /**
-     * Creates an {@link AmazonS3Client} from the established configuration.
-     *
-     * @param conf Hadoop configuration
-     * @param credentials AWS credentials
-     * @param awsConf AWS SDK configuration
-     * @return S3 client
-     * @throws IllegalArgumentException if misconfigured
-     */
-    private static AmazonS3 createAmazonS3Client(Configuration conf,
-        AWSCredentialsProvider credentials, ClientConfiguration awsConf)
-        throws IllegalArgumentException {
-      AmazonS3 s3 = new AmazonS3Client(credentials, awsConf);
-      String endPoint = conf.getTrimmed(ENDPOINT, "");
-      if (!endPoint.isEmpty()) {
-        try {
-          s3.setEndpoint(endPoint);
-        } catch (IllegalArgumentException e) {
-          String msg = "Incorrect endpoint: "  + e.getMessage();
-          LOG.error(msg);
-          throw new IllegalArgumentException(msg, e);
-        }
-      }
-      enablePathStyleAccessIfRequired(s3, conf);
-      return s3;
-    }
-
-    /**
-     * Enables path-style access to S3 buckets if configured.  By default, the
-     * behavior is to use virtual hosted-style access with URIs of the form
-     * http://bucketname.s3.amazonaws.com.  Enabling path-style access and a
-     * region-specific endpoint switches the behavior to use URIs of the form
-     * http://s3-eu-west-1.amazonaws.com/bucketname.
-     *
-     * @param s3 S3 client
-     * @param conf Hadoop configuration
-     */
-    private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
-        Configuration conf) {
-      final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
-      if (pathStyleAccess) {
-        LOG.debug("Enabling path style access!");
-        s3.setS3ClientOptions(S3ClientOptions.builder()
-            .setPathStyleAccess(true)
-            .build());
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 789c6d7..777c161 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -140,7 +140,18 @@ public enum Statistic {
   STREAM_WRITE_TOTAL_DATA("stream_write_total_data",
       "Count of total data uploaded in block output"),
   STREAM_WRITE_QUEUE_DURATION("stream_write_queue_duration",
-      "Total queue duration of all block uploads");
+      "Total queue duration of all block uploads"),
+
+  // S3Guard stats
+  S3GUARD_METADATASTORE_PUT_PATH_REQUEST(
+      "s3guard_metadatastore_put_path_request",
+      "s3guard metadata store put one metadata path request"),
+  S3GUARD_METADATASTORE_PUT_PATH_LATENCY(
+      "s3guard_metadatastore_put_path_latency",
+      "s3guard metadata store put one metadata path lantency"),
+  S3GUARD_METADATASTORE_INITIALIZATION("s3guard_metadatastore_initialization",
+      "s3guard metadata store initialization times");
+
 
   private static final Map<String, Statistic> SYMBOL_MAP =
       new HashMap<>(Statistic.values().length);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java
new file mode 100644
index 0000000..0462ccf
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * Simple enum to express {true, false, don't know}.
+ */
+public enum Tristate {
+  // Do not add additional values here.  Logic will assume there are exactly
+  // three possibilities.
+  TRUE, FALSE, UNKNOWN;
+
+  public static Tristate fromBool(boolean v) {
+    return v ? TRUE : FALSE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java
new file mode 100644
index 0000000..238cd97
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.transfer.Upload;
+
+/**
+ * Simple struct that contains information about a S3 upload.
+ */
+public class UploadInfo {
+  private final Upload upload;
+  private final long length;
+
+  public UploadInfo(Upload upload, long length) {
+    this.upload = upload;
+    this.length = length;
+  }
+
+  public Upload getUpload() {
+    return upload;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
new file mode 100644
index 0000000..dcee358
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * {@code DescendantsIterator} is a {@link RemoteIterator} that implements
+ * pre-ordering breadth-first traversal (BFS) of a path and all of its
+ * descendants recursively.  After visiting each path, that path's direct
+ * children are discovered by calling {@link MetadataStore#listChildren(Path)}.
+ * Each iteration returns the next direct child, and if that child is a
+ * directory, also pushes it onto a queue to discover its children later.
+ *
+ * For example, assume the consistent store contains metadata representing this
+ * file system structure:
+ *
+ * <pre>
+ * /dir1
+ * |-- dir2
+ * |   |-- file1
+ * |   `-- file2
+ * `-- dir3
+ *     |-- dir4
+ *     |   `-- file3
+ *     |-- dir5
+ *     |   `-- file4
+ *     `-- dir6
+ * </pre>
+ *
+ * Consider this code sample:
+ * <pre>
+ * final PathMetadata dir1 = get(new Path("/dir1"));
+ * for (DescendantsIterator descendants = new DescendantsIterator(dir1);
+ *     descendants.hasNext(); ) {
+ *   final FileStatus status = descendants.next().getFileStatus();
+ *   System.out.printf("%s %s%n", status.isDirectory() ? 'D' : 'F',
+ *       status.getPath());
+ * }
+ * </pre>
+ *
+ * The output is:
+ * <pre>
+ * D /dir1
+ * D /dir1/dir2
+ * D /dir1/dir3
+ * F /dir1/dir2/file1
+ * F /dir1/dir2/file2
+ * D /dir1/dir3/dir4
+ * D /dir1/dir3/dir5
+ * F /dir1/dir3/dir4/file3
+ * F /dir1/dir3/dir5/file4
+ * D /dir1/dir3/dir6
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DescendantsIterator implements RemoteIterator<FileStatus> {
+
+  private final MetadataStore metadataStore;
+  private final Queue<PathMetadata> queue = new LinkedList<>();
+
+  /**
+   * Creates a new {@code DescendantsIterator}.
+   *
+   * @param ms the associated {@link MetadataStore}
+   * @param meta base path for descendants iteration, which will be the first
+   *     returned during iteration (except root). Null makes empty iterator.
+   * @throws IOException if errors happen during metadata store listing
+   */
+  public DescendantsIterator(MetadataStore ms, PathMetadata meta)
+      throws IOException {
+    Preconditions.checkNotNull(ms);
+    this.metadataStore = ms;
+
+    if (meta != null) {
+      final Path path = meta.getFileStatus().getPath();
+      if (path.isRoot()) {
+        DirListingMetadata rootListing = ms.listChildren(path);
+        if (rootListing != null) {
+          rootListing = rootListing.withoutTombstones();
+          queue.addAll(rootListing.getListing());
+        }
+      } else {
+        queue.add(meta);
+      }
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return !queue.isEmpty();
+  }
+
+  @Override
+  public FileStatus next() throws IOException {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more descendants.");
+    }
+    PathMetadata next;
+    next = queue.poll();
+    if (next.getFileStatus().isDirectory()) {
+      final Path path = next.getFileStatus().getPath();
+      DirListingMetadata meta = metadataStore.listChildren(path);
+      if (meta != null) {
+        Collection<PathMetadata> more = meta.withoutTombstones().getListing();
+        if (!more.isEmpty()) {
+          queue.addAll(more);
+        }
+      }
+    }
+    return next.getFileStatus();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
new file mode 100644
index 0000000..e5b4fb5
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Tristate;
+
+/**
+ * {@code DirListingMetadata} models a directory listing stored in a
+ * {@link MetadataStore}.  Instances of this class are mutable and thread-safe.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DirListingMetadata {
+
+  /**
+   * Convenience parameter for passing into constructor.
+   */
+  public static final Collection<PathMetadata> EMPTY_DIR =
+      Collections.emptyList();
+
+  private final Path path;
+
+  /** Using a map for fast find / remove with large directories. */
+  private Map<Path, PathMetadata> listMap = new ConcurrentHashMap<>();
+
+  private boolean isAuthoritative;
+
+  /**
+   * Create a directory listing metadata container.
+   *
+   * @param path Path of the directory. If this path has a host component, then
+   *     all paths added later via {@link #put(FileStatus)} must also have
+   *     the same host.
+   * @param listing Entries in the directory.
+   * @param isAuthoritative true iff listing is the full contents of the
+   *     directory, and the calling client reports that this may be cached as
+   *     the full and authoritative listing of all files in the directory.
+   */
+  public DirListingMetadata(Path path, Collection<PathMetadata> listing,
+      boolean isAuthoritative) {
+
+    checkPathAbsolute(path);
+    this.path = path;
+
+    if (listing != null) {
+      for (PathMetadata entry : listing) {
+        Path childPath = entry.getFileStatus().getPath();
+        checkChildPath(childPath);
+        listMap.put(childPath, entry);
+      }
+    }
+    this.isAuthoritative  = isAuthoritative;
+  }
+
+  /**
+   * Copy constructor.
+   * @param d the existing {@link DirListingMetadata} object.
+   */
+  public DirListingMetadata(DirListingMetadata d) {
+    path = d.path;
+    isAuthoritative = d.isAuthoritative;
+    listMap = new ConcurrentHashMap<>(d.listMap);
+  }
+
+  /**
+   * @return {@code Path} of the directory that contains this listing.
+   */
+  public Path getPath() {
+    return path;
+  }
+
+  /**
+   * @return entries in the directory
+   */
+  public Collection<PathMetadata> getListing() {
+    return Collections.unmodifiableCollection(listMap.values());
+  }
+
+  public Set<Path> listTombstones() {
+    Set<Path> tombstones = new HashSet<>();
+    for (PathMetadata meta : listMap.values()) {
+      if (meta.isDeleted()) {
+        tombstones.add(meta.getFileStatus().getPath());
+      }
+    }
+    return tombstones;
+  }
+
+  public DirListingMetadata withoutTombstones() {
+    Collection<PathMetadata> filteredList = new ArrayList<>();
+    for (PathMetadata meta : listMap.values()) {
+      if (!meta.isDeleted()) {
+        filteredList.add(meta);
+      }
+    }
+    return new DirListingMetadata(path, filteredList, isAuthoritative);
+  }
+
+  /**
+   * @return number of entries tracked.  This is not the same as the number
+   * of entries in the actual directory unless {@link #isAuthoritative()} is
+   * true.
+   */
+  public int numEntries() {
+    return listMap.size();
+  }
+
+  /**
+   * @return true iff this directory listing is full and authoritative within
+   * the scope of the {@code MetadataStore} that returned it.
+   */
+  public boolean isAuthoritative() {
+    return isAuthoritative;
+  }
+
+
+  /**
+   * Is the underlying directory known to be empty?
+   * @return FALSE if directory is known to have a child entry, TRUE if
+   * directory is known to be empty, UNKNOWN otherwise.
+   */
+  public Tristate isEmpty() {
+    if (getListing().isEmpty()) {
+      if (isAuthoritative()) {
+        return Tristate.TRUE;
+      } else {
+        // This listing is empty, but may not be full list of underlying dir.
+        return Tristate.UNKNOWN;
+      }
+    } else { // not empty listing
+      // There exists at least one child, dir not empty.
+      return Tristate.FALSE;
+    }
+  }
+
+  /**
+   * Marks this directory listing as full and authoritative.
+   * @param authoritative see {@link #isAuthoritative()}.
+   */
+  public void setAuthoritative(boolean authoritative) {
+    this.isAuthoritative = authoritative;
+  }
+
+  /**
+   * Lookup entry within this directory listing.  This may return null if the
+   * {@code MetadataStore} only tracks a partial set of the directory entries.
+   * In the case where {@link #isAuthoritative()} is true, however, this
+   * function returns null iff the directory is known not to contain the listing
+   * at given path (within the scope of the {@code MetadataStore} that returned
+   * it).
+   *
+   * @param childPath path of entry to look for.
+   * @return entry, or null if it is not present or not being tracked.
+   */
+  public PathMetadata get(Path childPath) {
+    checkChildPath(childPath);
+    return listMap.get(childPath);
+  }
+
+  /**
+   * Replace an entry with a tombstone.
+   * @param childPath path of entry to replace.
+   */
+  public void markDeleted(Path childPath) {
+    checkChildPath(childPath);
+    listMap.put(childPath, PathMetadata.tombstone(childPath));
+  }
+
+  /**
+   * Remove entry from this directory.
+   *
+   * @param childPath path of entry to remove.
+   */
+  public void remove(Path childPath) {
+    checkChildPath(childPath);
+    listMap.remove(childPath);
+  }
+
+  /**
+   * Add an entry to the directory listing.  If this listing already contains a
+   * {@code FileStatus} with the same path, it will be replaced.
+   *
+   * @param childFileStatus entry to add to this directory listing.
+   * @return true if the status was added or replaced with a new value. False
+   * if the same FileStatus value was already present.
+   */
+  public boolean put(FileStatus childFileStatus) {
+    Preconditions.checkNotNull(childFileStatus,
+        "childFileStatus must be non-null");
+    Path childPath = childStatusToPathKey(childFileStatus);
+    PathMetadata newValue = new PathMetadata(childFileStatus);
+    PathMetadata oldValue = listMap.put(childPath, newValue);
+    return oldValue == null || !oldValue.equals(newValue);
+  }
+
+  @Override
+  public String toString() {
+    return "DirListingMetadata{" +
+        "path=" + path +
+        ", listMap=" + listMap +
+        ", isAuthoritative=" + isAuthoritative +
+        '}';
+  }
+
+  /**
+   * Log contents to supplied StringBuilder in a pretty fashion.
+   * @param sb target StringBuilder
+   */
+  public void prettyPrint(StringBuilder sb) {
+    sb.append(String.format("DirMeta %-20s %-18s",
+        path.toString(),
+        isAuthoritative ? "Authoritative" : "Not Authoritative"));
+    for (Map.Entry<Path, PathMetadata> entry : listMap.entrySet()) {
+      sb.append("\n   key: ").append(entry.getKey()).append(": ");
+      entry.getValue().prettyPrint(sb);
+    }
+    sb.append("\n");
+  }
+
+  public String prettyPrint() {
+    StringBuilder sb = new StringBuilder();
+    prettyPrint(sb);
+    return sb.toString();
+  }
+
+  /**
+   * Checks that child path is valid.
+   * @param childPath path to check.
+   */
+  private void checkChildPath(Path childPath) {
+    checkPathAbsolute(childPath);
+
+    // If this dir's path has host (and thus scheme), so must its children
+    URI parentUri = path.toUri();
+    if (parentUri.getHost() != null) {
+      URI childUri = childPath.toUri();
+      Preconditions.checkNotNull(childUri.getHost(), "Expected non-null URI " +
+          "host: %s", childUri);
+      Preconditions.checkArgument(
+          childUri.getHost().equals(parentUri.getHost()),
+          "childUri %s and parentUri %s must have the same host",
+          childUri, parentUri);
+      Preconditions.checkNotNull(childUri.getScheme(), "No scheme in path %s",
+          childUri);
+    }
+    Preconditions.checkArgument(!childPath.isRoot(),
+        "childPath cannot be the root path: %s", childPath);
+    Preconditions.checkArgument(childPath.getParent().equals(path),
+        "childPath %s must be a child of %s", childPath, path);
+  }
+
+  /**
+   * For Paths that are handed in directly, we assert they are in consistent
+   * format with checkPath().  For paths that are supplied embedded in
+   * FileStatus, we attempt to fill in missing scheme and host, when this
+   * DirListingMetadata is associated with one.
+   *
+   * @return Path suitable for consistent hashtable lookups
+   * @throws NullPointerException null status argument
+   * @throws IllegalArgumentException bad status values or failure to
+   *                                  create a URI.
+   */
+  private Path childStatusToPathKey(FileStatus status) {
+    Path p = status.getPath();
+    Preconditions.checkNotNull(p, "Child status' path cannot be null");
+    Preconditions.checkArgument(!p.isRoot(),
+        "childPath cannot be the root path: %s", p);
+    Preconditions.checkArgument(p.getParent().equals(path),
+        "childPath %s must be a child of %s", p, path);
+    URI uri = p.toUri();
+    URI parentUri = path.toUri();
+    // If FileStatus' path is missing host, but should have one, add it.
+    if (uri.getHost() == null && parentUri.getHost() != null) {
+      try {
+        return new Path(new URI(parentUri.getScheme(), parentUri.getHost(),
+            uri.getPath(), uri.getFragment()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException("FileStatus path invalid with" +
+            " added " + parentUri.getScheme() + "://" + parentUri.getHost() +
+            " added", e);
+      }
+    }
+    return p;
+  }
+
+  private void checkPathAbsolute(Path p) {
+    Preconditions.checkNotNull(p, "path must be non-null");
+    Preconditions.checkArgument(p.isAbsolute(), "path must be absolute: %s", p);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org