You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/09/02 15:28:59 UTC

[hadoop] branch branch-3.0 updated: HADOOP-17199. S3A Directory Marker HADOOP-13230 backport

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 105fd92  HADOOP-17199. S3A Directory Marker HADOOP-13230 backport
105fd92 is described below

commit 105fd92e82400df11cb5d3748265e70779c8fe25
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Sat Aug 29 12:17:27 2020 +0100

    HADOOP-17199. S3A Directory Marker HADOOP-13230 backport
    
    This backports the listing-side changes of HADOOP-13230.
    
    With this patch in, this branch of Hadoop is compatible with S3A clients
    which do not delete directory markers when files are created underneath.
    
    It does not allow this version to disable marker deletion; if the
    fs.s3a.marker.retention option is changed to request this, a message
    is printed at INFO and the policy remains at "delete"
    
    The s3guard bucket-info command has been extended to support
    probing for marker retention, as has the hasPathCapability method on
    S3AFileSystem.
    
    Read the documentation!
    
    Change-Id: I72c9ee96e11880ed8ecbb9d3427ac02ff95249e8
---
 .../AbstractContractGetFileStatusTest.java         |   8 +-
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  91 +++
 .../hadoop/fs/s3a/InconsistentAmazonS3Client.java  |   4 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 296 +++++---
 .../org/apache/hadoop/fs/s3a/S3ListResult.java     | 112 ++-
 .../apache/hadoop/fs/s3a/impl/DirectoryPolicy.java | 110 +++
 .../hadoop/fs/s3a/impl/DirectoryPolicyImpl.java    | 180 +++++
 .../apache/hadoop/fs/s3a/impl/StatusProbeEnum.java |  57 ++
 .../apache/hadoop/fs/s3a/s3guard/S3GuardTool.java  |  69 +-
 .../markdown/tools/hadoop-aws/directory_markers.md | 295 ++++++++
 .../src/site/markdown/tools/hadoop-aws/index.md    |  23 +-
 .../apache/hadoop/fs/s3a/AbstractS3ATestBase.java  |   4 +
 .../hadoop/fs/s3a/ITestS3AFileOperationCost.java   |  20 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  14 +
 .../apache/hadoop/fs/s3a/TestS3AGetFileStatus.java |  38 +-
 .../fs/s3a/impl/TestDirectoryMarkerPolicy.java     | 161 +++++
 .../performance/ITestDirectoryMarkerListing.java   | 773 +++++++++++++++++++++
 .../hadoop/fs/s3a/performance/OperationCost.java   | 201 ++++++
 .../fs/s3a/performance/OperationCostValidator.java | 481 +++++++++++++
 .../s3a/s3guard/AbstractS3GuardToolTestBase.java   |  36 +
 .../hadoop/fs/s3a/test/costs/HeadListCosts.java    | 123 ++++
 21 files changed, 2965 insertions(+), 131 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
index 269e35e..3461108 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.fs.contract;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -406,7 +408,8 @@ public abstract class AbstractContractGetFileStatusTest extends
     Path path = getContract().getTestPath();
     fs.delete(path, true);
     // create a - non-qualified - Path for a subdir
-    Path subfolder = path.suffix('/' + this.methodName.getMethodName());
+    Path subfolder = path.suffix('/' + this.methodName.getMethodName()
+        + "-" + UUID.randomUUID());
     mkdirs(subfolder);
     return subfolder;
   }
@@ -531,7 +534,8 @@ public abstract class AbstractContractGetFileStatusTest extends
       Path path,
       PathFilter filter) throws IOException {
     FileStatus[] result = getFileSystem().listStatus(path, filter);
-    assertEquals("length of listStatus(" + path + ", " + filter + " )",
+    assertEquals("length of listStatus(" + path + ", " + filter + " ) " +
+        Arrays.toString(result),
         expected, result.length);
     return result;
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index d278bdf..4ab5655 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -463,4 +463,95 @@ public final class Constants {
   @InterfaceStability.Unstable
   public static final int DEFAULT_LIST_VERSION = 2;
 
+  /**
+   * Policy for directory markers.
+   * This is a new feature of HADOOP-13230 which addresses
+   * some scale, performance and permissions issues -but
+   * at the risk of backwards compatibility.
+   * <p></p>
+   * This Hadoop release only supports the original "delete"
+   * policy.
+   */
+  public static final String DIRECTORY_MARKER_POLICY =
+      "fs.s3a.directory.marker.retention";
+
+  /**
+   * Delete directory markers. This is the backwards compatible option.
+   * Value: {@value}.
+   */
+  public static final String DIRECTORY_MARKER_POLICY_DELETE =
+      "delete";
+
+  /**
+   * Retain directory markers (unsupported in this release).
+   * Value: {@value}.
+   */
+  public static final String DIRECTORY_MARKER_POLICY_KEEP =
+      "keep";
+
+  /**
+   * Retain directory markers in authoritative directory trees only
+   *  (unsupported in this release).
+   * Value: {@value}.
+   */
+  public static final String DIRECTORY_MARKER_POLICY_AUTHORITATIVE =
+      "authoritative";
+
+  /**
+   * Default retention policy: {@value}.
+   */
+  public static final String DEFAULT_DIRECTORY_MARKER_POLICY =
+      DIRECTORY_MARKER_POLICY_DELETE;
+
+
+  /**
+   * {@code PathCapabilities} probe to verify that an S3A Filesystem
+   * has the changes needed to safely work with buckets where
+   * directoy markers have not been deleted.
+   * Value: {@value}.
+   */
+  public static final String STORE_CAPABILITY_DIRECTORY_MARKER_AWARE
+      = "fs.s3a.capability.directory.marker.aware";
+
+  /**
+   * {@code PathCapabilities} probe to indicate that the filesystem
+   * keeps directory markers.
+   * Value: {@value}.
+   */
+  public static final String STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP
+      = "fs.s3a.capability.directory.marker.policy.keep";
+
+  /**
+   * {@code PathCapabilities} probe to indicate that the filesystem
+   * deletes directory markers.
+   * Value: {@value}.
+   */
+  public static final String STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE
+      = "fs.s3a.capability.directory.marker.policy.delete";
+
+  /**
+   * {@code PathCapabilities} probe to indicate that the filesystem
+   * keeps directory markers in authoritative paths only.
+   * Value: {@value}.
+   */
+  public static final String
+      STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE =
+      "fs.s3a.capability.directory.marker.policy.authoritative";
+
+  /**
+   * {@code PathCapabilities} probe to indicate that a path/S3GuardTool
+   * keeps directory markers.
+   * Value: {@value}.
+   */
+  public static final String STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP
+      = "fs.s3a.capability.directory.marker.action.keep";
+
+  /**
+   * {@code PathCapabilities} probe to indicate that a path
+   * deletes directory markers.
+   * Value: {@value}.
+   */
+  public static final String STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE
+      = "fs.s3a.capability.directory.marker.action.delete";
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
index 6476f5d..03d6090 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -280,7 +280,9 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     } else {
       Path actualParentPath = new Path(child).getParent();
       Path expectedParentPath = new Path(parent);
-      return actualParentPath.equals(expectedParentPath);
+      // children which are directory markers are excluded here
+      return actualParentPath.equals(expectedParentPath)
+          && !child.endsWith("/");
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index c3e012c..a225ce4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -31,6 +31,7 @@ import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.Objects;
@@ -81,6 +82,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
+import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
+import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -95,6 +99,7 @@ import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
@@ -131,7 +136,7 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class S3AFileSystem extends FileSystem {
+public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   /**
    * Default blocksize as used in blocksize and FS status queries.
    */
@@ -170,6 +175,11 @@ public class S3AFileSystem extends FileSystem {
   private int blockOutputActiveBlocks;
   private boolean useListV1;
 
+  /**
+   * Directory policy.
+   */
+  private DirectoryPolicy directoryPolicy;
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -304,6 +314,9 @@ public class S3AFileSystem extends FileSystem {
         LOG.debug("Using metadata store {}, authoritative={}",
             getMetadataStore(), allowAuthoritative);
       }
+      // directory policy, which may look at authoritative paths
+      directoryPolicy = DirectoryPolicyImpl.getDirectoryPolicy(conf);
+      LOG.debug("Directory marker retention policy is {}", directoryPolicy);
     } catch (AmazonClientException e) {
       throw translateException("initializing ", new Path(name), e);
     }
@@ -420,6 +433,19 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Returns the S3 client used by this filesystem.
+   * <i>Warning: this must only be used for testing, as it bypasses core
+   * S3A operations. </i>
+   * @param reason a justification for requesting access.
+   * @return AmazonS3Client
+   */
+  @VisibleForTesting
+  public AmazonS3 getAmazonS3ClientForTesting(String reason) {
+    LOG.warn("Access to S3A client requested, reason {}", reason);
+    return s3;
+  }
+
+  /**
    * Get the region of a bucket.
    * @return the region in which a bucket is located
    * @throws IOException on any failure.
@@ -838,6 +864,10 @@ public class S3AFileSystem extends FileSystem {
     }
     // TODO S3Guard HADOOP-13761: retries when source paths are not visible yet
     // TODO S3Guard: performance: mark destination dirs as authoritative
+    // The path to whichever file or directory is created by the
+    // rename. When deleting markers all parents of
+    // this path will need their markers pruned.
+    Path destCreated = dst;
 
     // Ok! Time to start
     if (srcStatus.isFile()) {
@@ -851,9 +881,11 @@ public class S3AFileSystem extends FileSystem {
         String filename =
             srcKey.substring(pathToKey(src.getParent()).length()+1);
         newDstKey = newDstKey + filename;
+        destCreated = keyToQualifiedPath(newDstKey);
+
         copyFile(srcKey, newDstKey, length);
         S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src,
-            keyToQualifiedPath(newDstKey), length, getDefaultBlockSize(dst),
+            destCreated, length, getDefaultBlockSize(dst),
             username);
       } else {
         copyFile(srcKey, dstKey, srcStatus.getLen());
@@ -940,9 +972,10 @@ public class S3AFileSystem extends FileSystem {
 
     metadataStore.move(srcPaths, dstMetas);
 
-    if (src.getParent() != dst.getParent()) {
-      deleteUnnecessaryFakeDirectories(dst.getParent());
-      createFakeDirectoryIfNecessary(src.getParent());
+    if (!src.getParent().equals(destCreated.getParent())) {
+      LOG.debug("source & dest parents are different; fix up dir markers");
+      deleteUnnecessaryFakeDirectories(destCreated.getParent());
+      maybeCreateFakeParentDirectory(src);
     }
     return true;
   }
@@ -1556,6 +1589,21 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Create a fake parent directory if required.
+   * That is: it parent is not the root path and does not yet exist.
+   * @param path whose parent is created if needed.
+   * @throws IOException IO problem
+   * @throws AmazonClientException untranslated AWS client problem
+   */
+  void maybeCreateFakeParentDirectory(Path path)
+      throws IOException, AmazonClientException {
+    Path parent = path.getParent();
+    if (parent != null) {
+      createFakeDirectoryIfNecessary(parent);
+    }
+  }
+
+  /**
    * List the statuses of the files/directories in the given path if the path is
    * a directory.
    *
@@ -1822,6 +1870,8 @@ public class S3AFileSystem extends FileSystem {
 
       FileStatus msStatus = pm.getFileStatus();
       if (needEmptyDirectoryFlag && msStatus.isDirectory()) {
+        // the caller needs to know if a directory is empty,
+        // and that this is a directory.
         if (pm.isEmptyDirectory() != Tristate.UNKNOWN) {
           // We have a definitive true / false from MetadataStore, we are done.
           return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
@@ -1830,28 +1880,33 @@ public class S3AFileSystem extends FileSystem {
           if (children != null) {
             tombstones = children.listTombstones();
           }
-          LOG.debug("MetadataStore doesn't know if dir is empty, using S3.");
+          LOG.debug("MetadataStore doesn't know if {} is empty, using S3.",
+              path);
         }
       } else {
         // Either this is not a directory, or we don't care if it is empty
         return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
       }
 
-      // If the metadata store has no children for it and it's not listed in
-      // S3 yet, we'll assume the empty directory is true;
-      S3AFileStatus s3FileStatus;
+      // now issue the S3 getFileStatus call.
       try {
-        s3FileStatus = s3GetFileStatus(path, key, tombstones);
+        S3AFileStatus s3FileStatus = s3GetFileStatus(path, key,
+            StatusProbeEnum.ALL,
+            tombstones,
+            true);
+        // entry was found, so save in S3Guard and return the final value.
+        return S3Guard.putAndReturn(metadataStore, s3FileStatus,
+            instrumentation);
       } catch (FileNotFoundException e) {
         return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE);
       }
-      // entry was found, save in S3Guard
-      return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
     } else {
       // there was no entry in S3Guard
       // retrieve the data and update the metadata store in the process.
       return S3Guard.putAndReturn(metadataStore,
-          s3GetFileStatus(path, key, tombstones), instrumentation);
+          s3GetFileStatus(path, key, StatusProbeEnum.ALL,
+              tombstones, needEmptyDirectoryFlag),
+          instrumentation);
     }
   }
 
@@ -1861,86 +1916,94 @@ public class S3AFileSystem extends FileSystem {
    * and for direct management of empty directory blobs.
    * @param path Qualified path
    * @param key  Key string for the path
+   * @param probes probes to make
+   * @param tombstones tombstones to filter
+   * @param needEmptyDirectoryFlag if true, implementation will calculate
+   *        a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
    * @return Status
-   * @throws FileNotFoundException when the path does not exist
+   * @throws FileNotFoundException the supplied probes failed.
    * @throws IOException on other problems.
    */
-  private S3AFileStatus s3GetFileStatus(final Path path, String key,
-      Set<Path> tombstones) throws IOException {
-    if (!key.isEmpty()) {
+  private S3AFileStatus s3GetFileStatus(final Path path,
+      final String key,
+      final Set<StatusProbeEnum> probes,
+      final Set<Path> tombstones,
+      final boolean needEmptyDirectoryFlag) throws IOException {
+    LOG.debug("S3GetFileStatus {}", path);
+    Preconditions.checkArgument(!needEmptyDirectoryFlag
+        || probes.contains(StatusProbeEnum.List),
+        "s3GetFileStatus(%s) wants to know if a directory is empty but"
+            + " does not request a list probe", path);
+
+    if (!key.isEmpty() && !key.endsWith("/")
+        && probes.contains(StatusProbeEnum.Head)) {
       try {
+        // look for the simple file
         ObjectMetadata meta = getObjectMetadata(key);
-
-        if (objectRepresentsDirectory(key, meta.getContentLength())) {
-          LOG.debug("Found exact file: fake directory");
-          return new S3AFileStatus(Tristate.TRUE, path, username);
-        } else {
-          LOG.debug("Found exact file: normal file");
+        LOG.debug("Found exact file: normal file {}", key);
           return new S3AFileStatus(meta.getContentLength(),
               dateToLong(meta.getLastModified()),
               path,
               getDefaultBlockSize(path),
               username);
-        }
       } catch (AmazonServiceException e) {
+      // if the response is a 404 error, it just means that there is
+      // no file at that path...the remaining checks will be needed.
         if (e.getStatusCode() != 404) {
           throw translateException("getFileStatus", path, e);
         }
       } catch (AmazonClientException e) {
         throw translateException("getFileStatus", path, e);
       }
+    }
 
-      // Necessary?
-      if (!key.endsWith("/")) {
-        String newKey = key + "/";
+    // execute the list
+    if (probes.contains(StatusProbeEnum.List)) {
         try {
-          ObjectMetadata meta = getObjectMetadata(newKey);
-
-          if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
-            LOG.debug("Found file (with /): fake directory");
-            return new S3AFileStatus(Tristate.TRUE, path, username);
+        // this will find a marker dir / as well as an entry.
+        // When making a simple "is this a dir check" all is good.
+        // but when looking for an empty dir, we need to verify there are no
+        // children, so ask for two entries, so as to find
+        // a child
+        String dirKey = maybeAddTrailingSlash(key);
+        // list size is dir marker + at least one non-tombstone entry
+        // there's a corner case: more tombstones than you have in a
+        // single page list. We assume that if you have been deleting
+        // that many files, then the AWS listing will have purged some
+        // by the time of listing so that the response includes some
+        // which have not.
+
+        int listSize;
+        if (tombstones == null) {
+          // no tombstones so look for a marker and at least one child.
+          listSize = 2;
           } else {
-            LOG.warn("Found file (with /): real file? should not happen: {}",
-                key);
-
-            return new S3AFileStatus(meta.getContentLength(),
-                    dateToLong(meta.getLastModified()),
-                    path,
-                    getDefaultBlockSize(path),
-                    username);
-          }
-        } catch (AmazonServiceException e) {
-          if (e.getStatusCode() != 404) {
-            throw translateException("getFileStatus", newKey, e);
-          }
-        } catch (AmazonClientException e) {
-          throw translateException("getFileStatus", newKey, e);
-        }
+          // build a listing > tombstones. If the caller has many thousands
+          // of tombstones this won't work properly, which is why pruning
+          // of expired tombstones matters.
+          listSize = Math.min(2 + tombstones.size(), Math.max(2, maxKeys));
       }
-    }
-
-    try {
-      key = maybeAddTrailingSlash(key);
-      S3ListRequest request = createListObjectsRequest(key, "/", 1);
+        S3ListRequest request = createListObjectsRequest(dirKey, "/",
+            listSize);
+        // execute the request
+        S3ListResult listResult = listObjects(request);
 
-      S3ListResult objects = listObjects(request);
 
-      Collection<String> prefixes = objects.getCommonPrefixes();
-      Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
-      if (!isEmptyOfKeys(prefixes, tombstones) ||
-          !isEmptyOfObjects(summaries, tombstones)) {
+        if (listResult.hasPrefixesOrObjects(this::keyToPath, tombstones)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Found path as directory (with /): {}/{}",
-              prefixes.size(), summaries.size());
-
-          for (S3ObjectSummary summary : summaries) {
-            LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
+            LOG.debug("Found path as directory (with /)");
+            listResult.logAtDebug(LOG);
           }
-          for (String prefix : prefixes) {
-            LOG.debug("Prefix: {}", prefix);
+          // At least one entry has been found.
+          // If looking for an empty directory, the marker must exist but no children.
+          // So the listing must contain the marker entry only.
+          if (needEmptyDirectoryFlag
+              && listResult.representsEmptyDirectory(
+                  this::keyToPath, dirKey, tombstones)) {
+            return new S3AFileStatus(Tristate.TRUE, path, username);
           }
-        }
-
+          // either an empty directory is not needed, or the
+          // listing does not meet the requirements.
         return new S3AFileStatus(Tristate.FALSE, path, username);
       } else if (key.isEmpty()) {
         LOG.debug("Found root directory");
@@ -1953,54 +2016,13 @@ public class S3AFileSystem extends FileSystem {
     } catch (AmazonClientException e) {
       throw translateException("getFileStatus", key, e);
     }
+    }
 
     LOG.debug("Not Found: {}", path);
     throw new FileNotFoundException("No such file or directory: " + path);
   }
 
   /**
-   * Helper function to determine if a collection of paths is empty
-   * after accounting for tombstone markers (if provided).
-   * @param keys Collection of path (prefixes / directories or keys).
-   * @param tombstones Set of tombstone markers, or null if not applicable.
-   * @return false if summaries contains objects not accounted for by
-   * tombstones.
-   */
-  private boolean isEmptyOfKeys(Collection<String> keys, Set<Path>
-      tombstones) {
-    if (tombstones == null) {
-      return keys.isEmpty();
-    }
-    for (String key : keys) {
-      Path qualified = keyToQualifiedPath(key);
-      if (!tombstones.contains(qualified)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Helper function to determine if a collection of object summaries is empty
-   * after accounting for tombstone markers (if provided).
-   * @param summaries Collection of objects as returned by listObjects.
-   * @param tombstones Set of tombstone markers, or null if not applicable.
-   * @return false if summaries contains objects not accounted for by
-   * tombstones.
-   */
-  private boolean isEmptyOfObjects(Collection<S3ObjectSummary> summaries,
-      Set<Path> tombstones) {
-    if (tombstones == null) {
-      return summaries.isEmpty();
-    }
-    Collection<String> stringCollection = new ArrayList<>(summaries.size());
-    for (S3ObjectSummary summary : summaries) {
-      stringCollection.add(summary.getKey());
-    }
-    return isEmptyOfKeys(stringCollection, tombstones);
-  }
-
-  /**
    * Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
    * S3Guard MetadataStore, if any, will be skipped.
    * @return true if path exists in S3
@@ -2009,7 +2031,8 @@ public class S3AFileSystem extends FileSystem {
     Path path = qualify(f);
     String key = pathToKey(path);
     try {
-      s3GetFileStatus(path, key, null);
+      s3GetFileStatus(path, key, StatusProbeEnum.ALL,
+          null, false);
       return true;
     } catch (FileNotFoundException e) {
       return false;
@@ -2463,6 +2486,14 @@ public class S3AFileSystem extends FileSystem {
     return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
   }
 
+  /**
+   * Get the directory marker policy of this filesystem.
+   * @return the marker policy.
+   */
+  public DirectoryPolicy getDirectoryMarkerPolicy() {
+    return directoryPolicy;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
@@ -2494,6 +2525,7 @@ public class S3AFileSystem extends FileSystem {
     sb.append(", useListV1=").append(useListV1);
     sb.append(", boundedExecutor=").append(boundedThreadPool);
     sb.append(", unboundedExecutor=").append(unboundedThreadPool);
+    sb.append(", ").append(directoryPolicy);
     sb.append(", statistics {")
         .append(statistics)
         .append("}");
@@ -2957,4 +2989,48 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * This Hadoop version does not support PathCapabilities.
+   * By implementing the method on its own, code which
+   * introspects to find the method can still probe for
+   * the capabilities of the store.
+   * @param path path (unused)
+   * @param capability capability to probe for.
+   * @return true if the FS has the specific capability.
+   * @throws IOException failure
+   */
+  public boolean hasPathCapability(final Path path, final String capability)
+      throws IOException {
+    return hasCapability(capability);
+  }
+
+  /**
+   * Return the capabilities of this filesystem instance.
+   * @param capability string to query the stream support for.
+   * @return whether the FS instance has the capability.
+   */
+  @SuppressWarnings("deprecation")
+  @Override
+  public boolean hasCapability(String capability) {
+
+    final String cap = capability.toLowerCase(Locale.ENGLISH);
+    switch (cap) {
+
+    case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
+      return true;
+
+    /*
+     * Marker policy capabilities are handed off.
+     */
+    case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
+    case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE:
+    case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
+    case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP:
+    case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE:
+      return getDirectoryMarkerPolicy().hasPathCapability(new Path("/"), cap);
+
+    default:
+      return false;
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java
index e8aff32..eec4770 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java
@@ -18,11 +18,18 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import com.amazonaws.services.s3.model.ListObjectsV2Result;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.slf4j.Logger;
 
-import java.util.List;
+import org.apache.hadoop.fs.Path;
 
 /**
  * API version-independent container for S3 List responses.
@@ -92,6 +99,109 @@ public class S3ListResult {
     } else {
       return v2Result.getCommonPrefixes();
     }
+  }
+
+  /**
+   * Is the list of object summaries empty
+   * after accounting for tombstone markers (if provided)?
+   * @param keyToPath callback for key to path mapping.
+   * @param tombstones Set of tombstone markers, or null if not applicable.
+   * @return false if summaries contains objects not accounted for by
+   * tombstones.
+   */
+  public boolean isEmptyOfObjects(
+      final Function<String, Path> keyToPath,
+      final Set<Path> tombstones) {
+    if (tombstones == null) {
+      return getObjectSummaries().isEmpty();
+    }
+    return isEmptyOfKeys(keyToPath,
+        objectSummaryKeys(),
+        tombstones);
+  }
+
+  /**
+   * Get the list of keys in the object summary.
+   * @return a possibly empty list
+   */
+  private List<String> objectSummaryKeys() {
+    return getObjectSummaries().stream()
+        .map(S3ObjectSummary::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Does this listing have prefixes or objects?
+   * @param keyToPath callback for key to path mapping.
+   * @param tombstones Set of tombstone markers, or null if not applicable.
+   * @return true if the reconciled list is non-empty
+   */
+  public boolean hasPrefixesOrObjects(
+      final Function<String, Path> keyToPath,
+      final Set<Path> tombstones) {
+
+    return !isEmptyOfKeys(keyToPath, getCommonPrefixes(), tombstones)
+        || !isEmptyOfObjects(keyToPath, tombstones);
+  }
+
+  /**
+   * Helper function to determine if a collection of keys is empty
+   * after accounting for tombstone markers (if provided).
+   * @param keyToPath callback for key to path mapping.
+   * @param keys Collection of path (prefixes / directories or keys).
+   * @param tombstones Set of tombstone markers, or null if not applicable.
+   * @return true if the list is considered empty.
+   */
+  public boolean isEmptyOfKeys(
+      final Function<String, Path> keyToPath,
+      final Collection<String> keys,
+      final Set<Path> tombstones) {
+    if (tombstones == null) {
+      return keys.isEmpty();
+    }
+    for (String key : keys) {
+      Path qualified = keyToPath.apply(key);
+      if (!tombstones.contains(qualified)) {
+        return false;
+      }
+    }
+    return true;
+  }
 
+  /**
+   * Does this listing represent an empty directory?
+   * @param keyToPath callback for key to path mapping.
+   * @param dirKey directory key
+   * @param tombstones Set of tombstone markers, or null if not applicable.
+   * @return true if the list is considered empty.
+   */
+  public boolean representsEmptyDirectory(
+      final Function<String, Path> keyToPath,
+      final String dirKey,
+      final Set<Path> tombstones) {
+    // If looking for an empty directory, the marker must exist but
+    // no children.
+    // So the listing must contain the marker entry only as an object,
+    // and prefixes is null
+    List<String> keys = objectSummaryKeys();
+    return keys.size() == 1 && keys.contains(dirKey)
+        && getCommonPrefixes().isEmpty();
+  }
+
+  /**
+   * dump the result at debug level only.
+   * @param log log to use
+   */
+  public void logAtDebug(Logger log) {
+    Collection<String> prefixes = getCommonPrefixes();
+    Collection<S3ObjectSummary> summaries = getObjectSummaries();
+    log.debug("Prefix count = {}; object count={}",
+        prefixes.size(), summaries.size());
+    for (S3ObjectSummary summary : summaries) {
+      log.debug("Summary: {} {}", summary.getKey(), summary.getSize());
+    }
+    for (String prefix : prefixes) {
+      log.debug("Prefix: {}", prefix);
+    }
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DirectoryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DirectoryPolicy.java
new file mode 100644
index 0000000..36dd2e4
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DirectoryPolicy.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_AUTHORITATIVE;
+import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_DELETE;
+import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_KEEP;
+
+/**
+ * Interface for Directory Marker policies to implement.
+ */
+
+public interface DirectoryPolicy {
+
+
+
+  /**
+   * Should a directory marker be retained?
+   * @param path path a file/directory is being created with.
+   * @return true if the marker MAY be kept, false if it MUST be deleted.
+   */
+  boolean keepDirectoryMarkers(Path path);
+
+  /**
+   * Get the marker policy.
+   * @return policy.
+   */
+  MarkerPolicy getMarkerPolicy();
+
+  /**
+   * Describe the policy for marker tools and logs.
+   * @return description of the current policy.
+   */
+  String describe();
+
+  /**
+   * Does a specific path have the relevant option.
+   * This is to be forwarded from the S3AFileSystem.hasPathCapability
+   * But only for those capabilities related to markers*
+   * @param path path
+   * @param capability capability
+   * @return true if the capability is supported, false if not
+   * @throws IllegalArgumentException if the capability is unknown.
+   */
+  boolean hasPathCapability(Path path, String capability);
+
+  /**
+   * Supported retention policies.
+   */
+  enum MarkerPolicy {
+
+    /**
+     * Delete markers.
+     * <p></p>
+     * This is the classic S3A policy,
+     */
+    Delete(DIRECTORY_MARKER_POLICY_DELETE),
+
+    /**
+     * Keep markers.
+     * <p></p>
+     * This is <i>Not backwards compatible</i>.
+     */
+    Keep(DIRECTORY_MARKER_POLICY_KEEP),
+
+    /**
+     * Keep markers in authoritative paths only.
+     * <p></p>
+     * This is <i>Not backwards compatible</i> within the
+     * auth paths, but is outside these.
+     */
+    Authoritative(DIRECTORY_MARKER_POLICY_AUTHORITATIVE);
+
+    /**
+     * The name of the option as allowed in configuration files
+     * and marker-aware tooling.
+     */
+    private final String optionName;
+
+    MarkerPolicy(final String optionName) {
+      this.optionName = optionName;
+    }
+
+    /**
+     * Get the option name.
+     * @return name of the option
+     */
+    public String getOptionName() {
+      return optionName;
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DirectoryPolicyImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DirectoryPolicyImpl.java
new file mode 100644
index 0000000..5949f83
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DirectoryPolicyImpl.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+
+import java.util.EnumSet;
+import java.util.Locale;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_DIRECTORY_MARKER_POLICY;
+import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY;
+import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_AUTHORITATIVE;
+import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_DELETE;
+import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_KEEP;
+import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_AWARE;
+import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE;
+import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP;
+import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE;
+import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE;
+import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP;
+
+/**
+ * Implementation of directory policy.
+ * <p> </p>
+ * As only the DELETE policy is supported, the policy logic here is relatively
+ * straightforward.
+ */
+public final class DirectoryPolicyImpl
+    implements DirectoryPolicy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DirectoryPolicyImpl.class);
+
+  /**
+   * Error string when unable to parse the marker policy option.
+   */
+  public static final String UNKNOWN_MARKER_POLICY =
+      "Unknown policy in "
+      + DIRECTORY_MARKER_POLICY + ": ";
+
+  /**
+   * All available policies.
+   */
+  private static final Set<MarkerPolicy> AVAILABLE_POLICIES =
+      EnumSet.of(MarkerPolicy.Delete);
+
+  /**
+   * Delete all markers.
+   */
+  public static final DirectoryPolicy DELETE = new DirectoryPolicyImpl(
+      MarkerPolicy.Delete, (p) -> false);
+
+  /**
+   * Chosen marker policy.
+   */
+  private final MarkerPolicy markerPolicy;
+
+  /**
+   * Constructor.
+   * @param markerPolicy marker policy
+   * @param authoritativeness function for authoritativeness
+   */
+  public DirectoryPolicyImpl(final MarkerPolicy markerPolicy,
+      final Predicate<Path> authoritativeness) {
+    this.markerPolicy = markerPolicy;
+  }
+
+  @Override
+  public boolean keepDirectoryMarkers(final Path path) {
+    return false;
+  }
+
+  @Override
+  public MarkerPolicy getMarkerPolicy() {
+    return markerPolicy;
+  }
+
+  @Override
+  public String describe() {
+    return markerPolicy.getOptionName();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "DirectoryMarkerRetention{");
+    sb.append("policy='").append(markerPolicy.getOptionName()).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Return path policy for store and paths.
+   * @param path path
+   * @param capability capability
+   * @return true if a capability is active
+   */
+  @Override
+  public boolean hasPathCapability(final Path path, final String capability) {
+
+    switch (capability) {
+    /*
+     * Marker policy is dynamically determined for the given path.
+     */
+    case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
+      return true;
+
+    case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
+    case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE:
+      return true;
+
+    case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE:
+    case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
+    case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP:
+      return false;
+
+    default:
+      throw new IllegalArgumentException("Unknown capability " + capability);
+    }
+  }
+
+  /**
+   * Create/Get the policy for this configuration.
+   * @param conf config
+   * @return a policy
+   */
+  public static DirectoryPolicy getDirectoryPolicy(
+      final Configuration conf) {
+    String option = conf.getTrimmed(DIRECTORY_MARKER_POLICY,
+        DEFAULT_DIRECTORY_MARKER_POLICY)
+        .toLowerCase(Locale.ENGLISH);
+    switch (option) {
+    case DIRECTORY_MARKER_POLICY_DELETE:
+      // only supported policy
+      LOG.debug("Directory markers will be deleted");
+      break;
+    case DIRECTORY_MARKER_POLICY_KEEP:
+    case DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
+      // known but not available.
+      LOG.info("Directory marker policy \"{}\" is unsupported,"
+              + " using \"delete\"", option);
+      break;
+    default:
+      throw new IllegalArgumentException(UNKNOWN_MARKER_POLICY + option);
+    }
+    return DELETE;
+  }
+
+  /**
+   * Enumerate all available policies.
+   * @return set of the policies.
+   */
+  public static Set<MarkerPolicy> availablePolicies() {
+    return AVAILABLE_POLICIES;
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java
new file mode 100644
index 0000000..f9749a1
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * Enum of probes which can be made of S3.
+ */
+public enum StatusProbeEnum {
+
+  /** The actual path. */
+  Head,
+  /** HEAD of the path + /. */
+  DirMarker,
+  /** LIST under the path. */
+  List;
+
+  /** Look for files and directories. */
+  public static final Set<StatusProbeEnum> ALL =
+      EnumSet.of(Head, List);
+
+  /** We only want the HEAD. */
+  public static final Set<StatusProbeEnum> HEAD_ONLY =
+      EnumSet.of(Head);
+
+  /** List operation only. */
+  public static final Set<StatusProbeEnum> LIST_ONLY =
+      EnumSet.of(List);
+
+  /** Look for files and directories. */
+  public static final Set<StatusProbeEnum> FILE =
+      HEAD_ONLY;
+
+  /** Skip the HEAD and only look for directories. */
+  public static final Set<StatusProbeEnum> DIRECTORIES =
+      LIST_ONLY;
+
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 4f0e8f7..7fcd9aa 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a.s3guard;
 
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintStream;
@@ -30,6 +31,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -37,6 +39,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -47,6 +51,8 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
+import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
 import org.apache.hadoop.fs.shell.CommandFormat;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -59,6 +65,8 @@ import static org.apache.hadoop.service.launcher.LauncherExitCodes.*;
 /**
  * CLI to manage S3Guard Metadata Store.
  */
+@InterfaceAudience.LimitedPrivate("management tools")
+@InterfaceStability.Evolving
 public abstract class S3GuardTool extends Configured implements Tool {
   private static final Logger LOG = LoggerFactory.getLogger(S3GuardTool.class);
 
@@ -960,6 +968,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
     public static final String AUTH_FLAG = "auth";
     public static final String NONAUTH_FLAG = "nonauth";
     public static final String ENCRYPTION_FLAG = "encryption";
+    public static final String MARKERS_FLAG = "markers";
+    public static final String MARKERS_AWARE = "aware";
 
     public static final String PURPOSE = "provide/check S3Guard information"
         + " about a specific bucket";
@@ -968,12 +978,22 @@ public abstract class S3GuardTool extends Configured implements Tool {
         + "Common options:\n"
         + "  -" + GUARDED_FLAG + " - Require S3Guard\n"
         + "  -" + ENCRYPTION_FLAG
-        + " -require {none, sse-s3, sse-kms} - Require encryption policy";
+        + " (none, sse-s3, sse-kms) - Require encryption policy\n"
+        + "  -" + MARKERS_FLAG
+        + " (aware, keep, delete, authoritative) - directory markers policy\n";
+
+    @VisibleForTesting
+    public static final String IS_MARKER_AWARE =
+        "The S3A connector can read data in S3 buckets where"
+            + " directory markers%n"
+            + "are not deleted (optional with later hadoop releases),%n"
+            + "and with buckets where they are.%n";
 
     BucketInfo(Configuration conf) {
       super(conf, GUARDED_FLAG, UNGUARDED_FLAG, AUTH_FLAG, NONAUTH_FLAG);
       CommandFormat format = getCommandFormat();
       format.addOptionWithValue(ENCRYPTION_FLAG);
+      format.addOptionWithValue(MARKERS_FLAG);
     }
 
     @Override
@@ -1053,10 +1073,57 @@ public abstract class S3GuardTool extends Configured implements Tool {
                 fsUri, desiredEncryption, encryption);
       }
 
+      // directory markers
+      processMarkerOption(out, fs,
+          getCommandFormat().getOptValue(MARKERS_FLAG));
+
+      // and finally flush the output and report a success.
       out.flush();
       return SUCCESS;
     }
 
+    /**
+     * Validate the marker options.
+     * @param out output stream
+     * @param fs filesystem
+     * @param path test path
+     * @param marker desired marker option -may be null.
+     */
+    private void processMarkerOption(final PrintStream out,
+        final S3AFileSystem fs,
+        final String marker) {
+      DirectoryPolicy markerPolicy = fs.getDirectoryMarkerPolicy();
+      String desc = markerPolicy.describe();
+      println(out, "%nThe directory marker policy is \"%s\"%n", desc);
+
+      DirectoryPolicy.MarkerPolicy mp = markerPolicy.getMarkerPolicy();
+
+      String desiredMarker = marker == null
+          ? ""
+          : marker.trim();
+      final String optionName = mp.getOptionName();
+      if (!desiredMarker.isEmpty()) {
+        if (MARKERS_AWARE.equalsIgnoreCase(desiredMarker)) {
+          // simple awareness test -provides a way to validate compatibility
+          // on the command line
+          println(out, IS_MARKER_AWARE);
+          String pols = DirectoryPolicyImpl.availablePolicies()
+              .stream()
+              .map(DirectoryPolicy.MarkerPolicy::getOptionName)
+              .collect(Collectors.joining(", "));
+          println(out, "Available Policies: %s", pols);
+
+        } else {
+          // compare with current policy
+          if (!optionName.equalsIgnoreCase(desiredMarker)) {
+            throw badState("Bucket %s: required marker policy is \"%s\""
+                    + " but actual policy is \"%s\"",
+                fs.getUri(), desiredMarker, optionName);
+          }
+        }
+      }
+    }
+
     private String printOption(PrintStream out,
         String description, String key, String defVal) {
       String t = getFilesystem().getConf().getTrimmed(key, defVal);
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/directory_markers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/directory_markers.md
new file mode 100644
index 0000000..63610f1
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/directory_markers.md
@@ -0,0 +1,295 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# Controlling the S3A Directory Marker Behavior
+
+From Hadoop 3.3.1 onwards, the S3A client can be configured to skip deleting
+directory markers when creating files under paths. This removes all scalability
+problems caused by deleting these markers -however, it is achieved at the expense
+of backwards compatibility.
+
+_This Hadoop release is compatible with versions of Hadoop which
+can be configured to retain directory markers above files._ 
+
+_It does not support any options to change the marker retention
+policy to anything other than the default `delete` policy._
+
+If the S3A filesystem is configured via 
+`fs.s3a.directory.marker.retention` to use a different policy
+(i.e `keep` or `authoritative`),
+a message will be logged at INFO and the connector will
+use the "delete" policy.
+
+The `s3guard bucket-info` tool [can be used to verify support](#bucket-info).
+This allows for a command line check of compatibility, including
+in scripts.
+
+_For details on alternative marker retention policies and strategies
+for safe usage, consult the documentation of a Hadoop release which
+supports the ability to change the marker policy._
+
+## <a name="bucket-info"></a> Verifying marker policy with `s3guard bucket-info`
+
+The `bucket-info` command has been enhanced to support verification from the command
+line of bucket policies via the `-marker` option
+
+
+| option | verifies |
+|--------|--------|
+| `-markers aware` | the hadoop release is "aware" of directory markers |
+| `-markers delete` | directory markers are deleted |
+| `-markers keep` | directory markers are kept (will always fail) |
+| `-markers authoritative` | directory markers are kept in authoritative paths (will always fail) |
+
+All releases of Hadoop which have been updated to be marker aware will support the `-markers aware` option.
+
+
+1. Updated releases which do not support switching marker retention policy will also support the
+`-markers delete` option.
+
+1. As this is such a a release, the other marker options
+(`-markers keep` and  `-markers authoritative`)] will always fail.
+
+
+Probing for marker awareness: `s3guard bucket-info -markers aware`  
+
+```
+> bin/hadoop s3guard bucket-info -markers aware s3a://landsat-pds/
+  Filesystem s3a://landsat-pds
+  Location: us-west-2
+  Filesystem s3a://landsat-pds is not using S3Guard
+  The "magic" committer is not supported
+  
+  S3A Client
+    Endpoint: fs.s3a.endpoint=s3.amazonaws.com
+    Encryption: fs.s3a.server-side-encryption-algorithm=none
+    Input seek policy: fs.s3a.experimental.input.fadvise=normal
+  
+  The directory marker policy is "delete"
+  
+  The S3A connector can read data in S3 buckets where directory markers
+  are not deleted (optional with later hadoop releases),
+  and with buckets where they are.
+
+  Available Policies: delete
+```
+
+The same command will fail on older releases, because the `-markers` option
+is unknown
+
+```
+> hadoop s3guard bucket-info -markers aware s3a://landsat-pds/
+Illegal option -markers
+Usage: hadoop bucket-info [OPTIONS] s3a://BUCKET
+    provide/check S3Guard information about a specific bucket
+
+Common options:
+  -guarded - Require S3Guard
+  -unguarded - Force S3Guard to be disabled
+  -auth - Require the S3Guard mode to be "authoritative"
+  -nonauth - Require the S3Guard mode to be "non-authoritative"
+  -magic - Require the S3 filesystem to be support the "magic" committer
+  -encryption -require {none, sse-s3, sse-kms} - Require encryption policy
+
+When possible and not overridden by more specific options, metadata
+repository information will be inferred from the S3A URL (if provided)
+
+Generic options supported are:
+  -conf <config file> - specify an application configuration file
+  -D <property=value> - define a value for a given property
+
+2020-08-12 16:47:16,579 [main] INFO  util.ExitUtil (ExitUtil.java:terminate(210)) - Exiting with status 42:
+ Illegal option -markers
+````
+
+The `-markers delete` option will verify that this release will delete directory markers.
+
+```
+> hadoop s3guard bucket-info -markers delete s3a://landsat-pds/
+ Filesystem s3a://landsat-pds
+ Location: us-west-2
+ Filesystem s3a://landsat-pds is not using S3Guard
+ The "magic" committer is not supported
+ 
+ S3A Client
+    Endpoint: fs.s3a.endpoint=s3.amazonaws.com
+    Encryption: fs.s3a.server-side-encryption-algorithm=none
+    Input seek policy: fs.s3a.experimental.input.fadvise=normal
+ 
+ The directory marker policy is "delete"
+
+```
+
+As noted: the sole option available on this Hadoop release is `delete`. Other policy
+probes will fail, returning error code 46. "unsupported"
+
+
+```
+> hadoop s3guard bucket-info -markers keep s3a://landsat-pds/
+Filesystem s3a://landsat-pds
+Location: us-west-2
+Filesystem s3a://landsat-pds is not using S3Guard
+The "magic" committer is not supported
+
+S3A Client
+    Endpoint: fs.s3a.endpoint=s3.amazonaws.com
+    Encryption: fs.s3a.server-side-encryption-algorithm=none
+    Input seek policy: fs.s3a.experimental.input.fadvise=normal
+
+The directory marker policy is "delete"
+
+2020-08-25 12:20:18,805 [main] INFO  util.ExitUtil (ExitUtil.java:terminate(210)) - Exiting with status 46:
+ 46: Bucket s3a://landsat-pds: required marker policy is "keep" but actual policy is "delete"
+```
+
+Even if the bucket configuration attempts to change the marker policy, probes for `keep` and `authoritative`
+will fail.
+
+Take, for example, a configuration for a specific bucket to delete markers under the authoritative path `/tables`:
+
+```xml
+<property>
+  <name>fs.s3a.bucket.s3-london.directory.marker.retention</name>
+  <value>authoritative</value>
+</property>
+<property>
+  <name>fs.s3a.bucket.s3-london.authoritative.path</name>
+  <value>/tables</value>
+</property>
+```
+
+The marker settings will be warned about on filesystem creation, and the marker policy to remain as `delete`.
+Thus a check for `-markers authoritative` will fail
+
+```
+> hadoop s3guard bucket-info -markers authoritative s3a://s3-london/
+2020-08-25 12:33:52,682 [main] INFO  impl.DirectoryPolicyImpl (DirectoryPolicyImpl.java:getDirectoryPolicy(163)) -
+ Directory marker policy "authoritative" is unsupported, using "delete"
+Filesystem s3a://s3-london
+Location: eu-west-2
+Filesystem s3a://s3-london is not using S3Guard
+The "magic" committer is supported
+
+S3A Client
+    Endpoint: fs.s3a.endpoint=s3.eu-west-2.amazonaws.com
+    Encryption: fs.s3a.server-side-encryption-algorithm=none
+    Input seek policy: fs.s3a.experimental.input.fadvise=normal
+
+The directory marker policy is "delete"
+
+2020-08-25 12:33:52,746 [main] INFO  util.ExitUtil (ExitUtil.java:terminate(210)) - Exiting with status 46:
+ 46: Bucket s3a://s3-london: required marker policy is "authoritative" but actual policy is "delete"
+```
+
+
+
+
+### <a name="pathcapabilities"></a> Probing for retention via `PathCapabilities` and `StreamCapabilities`
+
+An instance of the filesystem can be probed for its directory marker retention ability/
+policy can be probed for through the `org.apache.hadoop.fs.PathCapabilities` interface,
+which all FileSystem classes have supported since Hadoop 3.2.
+
+
+| Probe                   | Meaning                 |
+|-------------------------|-------------------------|
+| `fs.s3a.capability.directory.marker.aware`  | Does the filesystem support surplus directory markers? |
+| `fs.s3a.capability.directory.marker.policy.delete` | Is the bucket policy "delete"? |
+| `fs.s3a.capability.directory.marker.policy.keep`   | Is the bucket policy "keep"? |
+| `fs.s3a.capability.directory.marker.policy.authoritative` | Is the bucket policy "authoritative"? |
+| `fs.s3a.capability.directory.marker.action.delete` | If a file was created at this path, would directory markers be deleted? |
+| `fs.s3a.capability.directory.marker.action.keep`   | If a file was created at this path, would directory markers be retained? |
+
+
+The probe `fs.s3a.capability.directory.marker.aware` allows for a filesystem to be
+probed to determine if its file listing policy is "aware" of directory marker retention
+-that is: can this s3a client safely work with S3 buckets where markers have not been deleted.
+
+The `fs.s3a.capability.directory.marker.policy.` probes return the active policy for the bucket.
+
+The two `fs.s3a.capability.directory.marker.action.` probes dynamically query the marker
+retention behavior of a specific path.
+That is: if a file was created at that location, would ancestor directory markers
+be kept or deleted?
+
+The `S3AFileSystem` class also implements the `org.apache.hadoop.fs.StreamCapabilities` interface, which
+can be used to probe for marker awareness via the `fs.s3a.capability.directory.marker.aware` capability.
+
+Again, this will be true if-and-only-if the S3A connector is safe to work with S3A buckets/paths where
+directories are retained.
+
+*If an S3A instance, probed by `PathCapabilities` or `StreamCapabilities` for the capability
+`fs.s3a.capability.directory.marker.aware` and it returns false, *it is not safe to be used with
+S3A paths where markers have been retained*.
+
+
+## <a name="glossary"></a> Glossary
+
+#### Directory Marker
+
+An object in an S3 bucket with a trailing "/", used to indicate that there is a directory at that location.
+These are necessary to maintain expectations about directories in an object store:
+
+1. After `mkdirs(path)`, `exists(path)` holds.
+1. After `rm(path/*)`, `exists(path)` holds.
+
+In previous releases of Hadoop, the marker created by a `mkdirs()` operation was deleted after a file was created.
+Rather than make a slow HEAD probe + optional marker DELETE of every parent path element, HADOOP-13164 switched
+to enumerating all parent paths and issuing a single bulk DELETE request.
+This is faster under light load, but
+as each row in the delete consumes one write operation on the allocated IOPs of that bucket partition, creates
+load issues when many worker threads/processes are writing to files.
+This problem is bad on Apache Hive as:
+* The hive partition structure places all files within the same S3 partition.
+* As they are deep structures, there are many parent entries to include in the bulk delete calls.
+* It's creating a lot temporary files, and still uses rename to commit output.
+
+Apache Spark has less of an issue when an S3A committer is used -although the partition structure
+is the same, the delayed manifestation of output files reduces load.
+
+#### Leaf Marker
+
+A directory marker which has not files or directory marker objects underneath.
+It genuinely represents an empty directory.
+
+#### Surplus Marker
+
+A directory marker which is above one or more files, and so is superfluous.
+These are the markers which were traditionally deleted; now it is optional.
+
+Older versions of Hadoop mistake such surplus markers as Leaf Markers.
+
+#### Versioned Bucket
+
+An S3 Bucket which has Object Versioning enabled.
+
+This provides a backup and recovery mechanism for data within the same
+bucket: older objects can be listed and restored through the AWS S3 console
+and some applications.
+
+## References
+
+<!-- if extending, keep JIRAs separate, have them in numerical order; the rest in lexical.` -->
+
+* [HADOOP-13164](https://issues.apache.org/jira/browse/HADOOP-13164). _Optimize S3AFileSystem::deleteUnnecessaryFakeDirectories._
+
+* [HADOOP-13230](https://issues.apache.org/jira/browse/HADOOP-13230). _S3A to optionally retain directory markers_
+
+* [HADOOP-16090](https://issues.apache.org/jira/browse/HADOOP-16090). _S3A Client to add explicit support for versioned stores._
+
+* [HADOOP-16823](https://issues.apache.org/jira/browse/HADOOP-16823). _Large DeleteObject requests are their own Thundering Herd_
+
+* [Object Versioning](https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html). _Using versioning_
+
+* [Optimizing Performance](https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html). _Best Practices Design Patterns: Optimizing Amazon S3 Performance_
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 46ff64d..667edbc 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -17,17 +17,36 @@
 
 <!-- MACRO{toc|fromDepth=0|toDepth=2} -->
 
-**NOTE:  Hadoop's `s3:` and `s3n:` connectors have been removed.
+
+
+## <a name="compatibility"></a> Compatibility
+
+
+###  <a name="directory-marker-compatibility"></a> Directory Marker Compatibility
+
+1. This release can safely list/index/read S3 buckets where "empty directory"
+markers are retained.
+
+1. This release does not support the switching to directory marker
+policies of "keep" and "authoritative" -the 
+
+Consult [Controlling the S3A Directory Marker Behavior](directory_markers.html) for
+full details.
+
+### Shipping connectors: S3A only.
+
+Hadoop's `s3:` and `s3n:` connectors have been removed.
 Please use `s3a:` as the connector to data hosted in S3 with Apache Hadoop.**
 
 **Consult the [s3n documentation](./s3n.html) for migration instructions.**
 
 
-See also:
+## <a name="documents"></a> Documents
 
 * [Encryption](./encryption.html)
 * [S3Guard](./s3guard.html)
 * [Troubleshooting](./troubleshooting_s3a.html)
+* [Controlling the S3A Directory Marker Behavior](directory_markers.html).
 * [Testing](./testing.html)
 
 ##<a name="overview"></a> Overview
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
index f0c389d..4e54f68 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
@@ -61,6 +61,10 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
     Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
   }
 
+  protected String getMethodName() {
+    return methodName.getMethodName();
+  }
+
   @Override
   protected int getTestTimeoutMillis() {
     return S3A_TEST_TIMEOUT;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
index 3e293f7..d62d44a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
@@ -33,6 +33,7 @@ import java.net.URI;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.*;
 import static org.apache.hadoop.test.GenericTestUtils.getTestDir;
 import static org.junit.Assume.assumeFalse;
 
@@ -87,9 +88,9 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
         status.isEmptyDirectory() == Tristate.TRUE);
 
     if (!fs.hasMetadataStore()) {
-      metadataRequests.assertDiffEquals(2);
+      metadataRequests.assertDiffEquals(GET_FILE_STATUS_ON_EMPTY_DIR.head());
     }
-    listRequests.assertDiffEquals(0);
+    listRequests.assertDiffEquals(GET_FILE_STATUS_ON_EMPTY_DIR.list());
   }
 
   @Test
@@ -104,8 +105,8 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
     } catch (FileNotFoundException expected) {
       // expected
     }
-    metadataRequests.assertDiffEquals(2);
-    listRequests.assertDiffEquals(1);
+    metadataRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.head());
+    listRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.list());
   }
 
   @Test
@@ -120,8 +121,8 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
     } catch (FileNotFoundException expected) {
       // expected
     }
-    metadataRequests.assertDiffEquals(2);
-    listRequests.assertDiffEquals(1);
+    metadataRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.head());
+    listRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.list());
   }
 
   @Test
@@ -142,8 +143,8 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
           + "\n" + fsState);
     }
     if (!fs.hasMetadataStore()) {
-      metadataRequests.assertDiffEquals(2);
-      listRequests.assertDiffEquals(1);
+      metadataRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.head());
+      listRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.list());
     }
   }
 
@@ -174,7 +175,8 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
       Path remotePath = path("copied");
       s3a.copyFromLocalFile(false, true, localPath, remotePath);
       verifyFileContents(s3a, remotePath, data);
-      copyLocalOps.assertDiffEquals(1);
+      // this is not being counted in this branch
+      // copyLocalOps.assertDiffEquals(1);
       putRequests.assertDiffEquals(1);
       putBytes.assertDiffEquals(len);
       // print final stats
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index ad5523f..4ac2ed0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -43,6 +43,7 @@ import java.net.URISyntaxException;
 import java.util.List;
 import java.util.concurrent.Callable;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -474,6 +475,19 @@ public final class S3ATestUtils {
         });
   }
 
+
+  /**
+   * Get the name of the test bucket.
+   * @param conf configuration to scan.
+   * @return the bucket name from the config.
+   * @throws NullPointerException: no test bucket
+   */
+  public static String getTestBucketName(final Configuration conf) {
+    String bucket = checkNotNull(conf.get(TEST_FS_S3A_NAME),
+        "No test bucket");
+    return URI.create(bucket).getHost();
+  }
+
   /**
    * Remove any values from a bucket.
    * @param bucket bucket whose overrides are to be removed. Can be null/empty
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
index 586264d..b5615fa 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
@@ -76,11 +76,15 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
     String key = path.toUri().getPath().substring(1);
     when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
       .thenThrow(NOT_FOUND);
-    ObjectMetadata meta = new ObjectMetadata();
-    meta.setContentLength(0L);
-    when(s3.getObjectMetadata(argThat(
-        correctGetMetadataRequest(BUCKET, key + "/"))
-    )).thenReturn(meta);
+    String keyDir = key + "/";
+    ListObjectsV2Result listResult = new ListObjectsV2Result();
+    S3ObjectSummary objectSummary = new S3ObjectSummary();
+    objectSummary.setKey(keyDir);
+    objectSummary.setSize(0L);
+    listResult.getObjectSummaries().add(objectSummary);
+    when(s3.listObjectsV2(argThat(
+        matchListV2Request(BUCKET, keyDir))
+    )).thenReturn(listResult);
     FileStatus stat = fs.getFileStatus(path);
     assertNotNull(stat);
     assertEquals(fs.makeQualified(path), stat.getPath());
@@ -176,4 +180,28 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
       }
     };
   }
+  
+  private Matcher<ListObjectsV2Request> matchListV2Request(
+      String bucket, String key) {
+    return new BaseMatcher<ListObjectsV2Request>() {
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("bucket and key match");
+      }
+
+      @Override
+      public boolean matches(Object o) {
+        if(o instanceof ListObjectsV2Request) {
+          ListObjectsV2Request request =
+              (ListObjectsV2Request)o;
+          return request.getBucketName().equals(bucket)
+              && request.getPrefix().equals(key);
+        }
+        return false;
+      }
+    };
+  }
+
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestDirectoryMarkerPolicy.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestDirectoryMarkerPolicy.java
new file mode 100644
index 0000000..706797c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestDirectoryMarkerPolicy.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY;
+import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE;
+import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for directory marker policies.
+ * <p></p>
+ * As this FS only supports "delete", this is more minimal test
+ * suite than on later versions.
+ * <p></p>
+ * It helps ensure that there aren't unexpected problems if the site configuration
+ * asks for retention of some form.
+ */
+@RunWith(Parameterized.class)
+public class TestDirectoryMarkerPolicy extends HadoopTestBase {
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+        {
+            DirectoryPolicy.MarkerPolicy.Delete,
+        },
+        {
+            DirectoryPolicy.MarkerPolicy.Keep,
+        },
+        {
+            DirectoryPolicy.MarkerPolicy.Authoritative,
+        }
+    });
+  }
+
+  private final DirectoryPolicy directoryPolicy;
+
+  private static final boolean EXPECT_MARKER_RETENTION = false;
+
+  public TestDirectoryMarkerPolicy(
+      final DirectoryPolicy.MarkerPolicy markerPolicy) {
+    this.directoryPolicy = newPolicy(markerPolicy);
+  }
+
+  /**
+   * Create a new retention policy.
+   * @param markerPolicy policy option
+   * @return the retention policy.
+   */
+  private DirectoryPolicy newPolicy(
+      DirectoryPolicy.MarkerPolicy markerPolicy) {
+    return new DirectoryPolicyImpl(markerPolicy, FAIL_IF_INVOKED);
+  }
+
+  private static final Predicate<Path> FAIL_IF_INVOKED = (p) -> {
+    throw new RuntimeException("failed");
+  };
+
+  private final Path nonAuthPath = new Path("s3a://bucket/nonauth/data");
+
+  private final Path authPath = new Path("s3a://bucket/auth/data1");
+
+  private final Path deepAuth = new Path("s3a://bucket/auth/d1/d2/data2");
+
+  /**
+   * Assert that a path has a retention outcome.
+   * @param path path
+   * @param retain should the marker be retained
+   */
+  private void assertMarkerRetention(Path path, boolean retain) {
+    assertEquals("Retention of path " + path + " by " + directoryPolicy,
+        retain,
+        directoryPolicy.keepDirectoryMarkers(path));
+  }
+
+  /**
+   * Assert that a path has a capability.
+   */
+  private void assertPathCapability(Path path,
+      String capability,
+      boolean outcome) {
+    assertEquals(String.format(
+        "%s support for capability %s by path %s expected as %s",
+        directoryPolicy, capability, path, outcome),
+        outcome,
+        directoryPolicy.hasPathCapability(path, capability));
+  }
+
+  @Test
+  public void testNonAuthPath() throws Throwable {
+    assertMarkerRetention(nonAuthPath, EXPECT_MARKER_RETENTION);
+    assertPathCapability(nonAuthPath,
+        STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE,
+        !EXPECT_MARKER_RETENTION);
+    assertPathCapability(nonAuthPath,
+        STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP,
+        EXPECT_MARKER_RETENTION);
+  }
+
+  @Test
+  public void testAuthPath() throws Throwable {
+    assertMarkerRetention(authPath, EXPECT_MARKER_RETENTION);
+    assertPathCapability(authPath,
+        STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE,
+        !EXPECT_MARKER_RETENTION);
+    assertPathCapability(authPath,
+        STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP,
+        EXPECT_MARKER_RETENTION);
+  }
+
+  @Test
+  public void testDeepAuthPath() throws Throwable {
+    assertMarkerRetention(deepAuth, EXPECT_MARKER_RETENTION);
+    assertPathCapability(deepAuth,
+        STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE,
+        !EXPECT_MARKER_RETENTION);
+    assertPathCapability(deepAuth,
+        STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP,
+        EXPECT_MARKER_RETENTION);
+  }
+
+  @Test
+  public void testInstantiate() throws Throwable {
+    Configuration conf = new Configuration(false);
+    conf.set(DIRECTORY_MARKER_POLICY,
+        directoryPolicy.getMarkerPolicy().getOptionName());
+    DirectoryPolicy policy = DirectoryPolicyImpl.getDirectoryPolicy(
+        conf);
+    assertEquals(DirectoryPolicyImpl.DELETE, policy);
+
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestDirectoryMarkerListing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestDirectoryMarkerListing.java
new file mode 100644
index 0000000..11f10ac
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestDirectoryMarkerListing.java
@@ -0,0 +1,773 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
+import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * This is a test suite designed to verify that directory markers do
+ * not get misconstrued as empty directories during operations
+ * which explicitly or implicitly list directory trees.
+ * <p></p>
+ * It is also intended it to be backported to all releases
+ * which are enhanced to read directory trees where markers have
+ * been retained.
+ * Hence: it does not use any of the new helper classes to
+ * measure the cost of operations or attempt to create markers
+ * through the FS APIs.
+ * <p></p>
+ * Instead, the directory structure to test is created through
+ * low-level S3 SDK API calls.
+ * We also skip any probes to measure/assert metrics.
+ * We're testing the semantics here, not the cost of the operations.
+ * Doing that makes it a lot easier to backport.
+ *
+ * <p></p>
+ * Similarly: JUnit assertions over AssertJ.
+ * <p></p>
+ * The tests work with unguarded buckets only -the bucket settings are changed
+ * appropriately.
+ */
+public class ITestDirectoryMarkerListing extends AbstractS3ATestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestDirectoryMarkerListing.class);
+
+  private static final String FILENAME = "fileUnderMarker";
+
+  private static final String HELLO = "hello";
+
+  private static final String MARKER = "marker";
+
+  private static final String MARKER_PEER = "markerpeer";
+
+  /**
+   * Does rename copy markers?
+   * Value: {@value}
+   * <p></p>
+   * Older releases: yes.
+   * <p></p>
+   * The full marker-optimized releases: no.
+   */
+  private static final boolean RENAME_COPIES_MARKERS = true;
+
+  /**
+   * Path to a directory which has a marker.
+   */
+  private Path markerDir;
+
+  /**
+   * Key to the object representing {@link #markerDir}.
+   */
+  private String markerKey;
+
+  /**
+   * Key to the object representing {@link #markerDir} with
+   * a trailing / added. This references the actual object
+   * which has been created.
+   */
+  private String markerKeySlash;
+
+  /**
+   * bucket of tests.
+   */
+  private String bucket;
+
+  /**
+   * S3 Client of the FS.
+   */
+  private AmazonS3 s3client;
+
+  /**
+   * Path to a file under the marker.
+   */
+  private Path filePathUnderMarker;
+
+  /**
+   * Key to a file under the marker.
+   */
+  private String fileKeyUnderMarker;
+
+  /**
+   * base path for the test files; the marker dir goes under this.
+   */
+  private Path basePath;
+
+  /**
+   * Path to a file a peer of markerDir.
+   */
+  private Path markerPeer;
+
+  /**
+   * Key to a file a peer of markerDir.
+   */
+  private String markerPeerKey;
+
+  public ITestDirectoryMarkerListing() {
+
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    String bucketName = getTestBucketName(conf);
+
+    // Turn off S3Guard
+    removeBaseAndBucketOverrides(bucketName, conf,
+        S3_METADATA_STORE_IMPL,
+        METADATASTORE_AUTHORITATIVE);
+
+    return conf;
+  }
+
+  /**
+   * The setup phase includes creating the test objects.
+   */
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    S3AFileSystem fs = getFileSystem();
+    assume("unguarded FS only",
+        !fs.hasMetadataStore());
+    s3client = fs.getAmazonS3ClientForTesting("markers");
+
+    bucket = fs.getBucket();
+    Path base = new Path(methodPath(), "base");
+
+    createTestObjects(base);
+  }
+
+  /**
+   * Teardown deletes the objects created before
+   * the superclass does the directory cleanup.
+   */
+  @Override
+  public void teardown() throws Exception {
+    if (s3client != null) {
+      deleteObject(markerKey);
+      deleteObject(markerKeySlash);
+      deleteObject(markerPeerKey);
+      deleteObject(fileKeyUnderMarker);
+    }
+    // do this ourselves to avoid audits teardown failing
+    // when surplus markers are found
+    deleteTestDirInTeardown();
+    super.teardown();
+  }
+
+  public Path methodPath() throws IOException {
+    return path(getMethodName());
+  }
+
+  /**
+   * Create the test objects under the given path, setting
+   * various fields in the process.
+   * @param path parent path of everything
+   */
+  private void createTestObjects(final Path path) throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    basePath = path;
+    markerDir = new Path(basePath, MARKER);
+    // peer path has the same initial name to make sure there
+    // is no confusion there.
+    markerPeer = new Path(basePath, MARKER_PEER);
+    markerPeerKey = fs.pathToKey(markerPeer);
+    markerKey = fs.pathToKey(markerDir);
+    markerKeySlash = markerKey + "/";
+    fileKeyUnderMarker = markerKeySlash + FILENAME;
+    filePathUnderMarker = new Path(markerDir, FILENAME);
+    // put the empty dir
+    fs.mkdirs(markerDir);
+    touch(fs, markerPeer);
+    put(fileKeyUnderMarker, HELLO);
+  }
+
+  /*
+  =================================================================
+    Basic probes
+  =================================================================
+  */
+
+  @Test
+  public void testMarkerExists() throws Throwable {
+    describe("Verify the marker exists");
+    head(markerKeySlash);
+    assertIsDirectory(markerDir);
+  }
+
+  @Test
+  public void testObjectUnderMarker() throws Throwable {
+    describe("verify the file under the marker dir exists");
+    assertIsFile(filePathUnderMarker);
+    head(fileKeyUnderMarker);
+  }
+
+  /*
+  =================================================================
+    The listing operations
+  =================================================================
+  */
+
+  @Test
+  public void testListStatusMarkerDir() throws Throwable {
+    describe("list the marker directory and expect to see the file");
+    assertContainsFileUnderMarkerOnly(
+        toList(getFileSystem().listStatus(markerDir)));
+  }
+
+
+  @Test
+  public void testListFilesMarkerDirFlat() throws Throwable {
+    assertContainsFileUnderMarkerOnly(toList(
+        getFileSystem().listFiles(markerDir, false)));
+  }
+
+  @Test
+  public void testListFilesMarkerDirRecursive() throws Throwable {
+    List<FileStatus> statuses = toList(
+        getFileSystem().listFiles(markerDir, true));
+    assertContainsFileUnderMarkerOnly(statuses);
+  }
+
+  /**
+   * Path listing above the base dir MUST only find the file
+   * and not the marker.
+   */
+  @Test
+  public void testListStatusBaseDirRecursive() throws Throwable {
+    List<FileStatus> statuses = toList(
+        getFileSystem().listFiles(basePath, true));
+    assertContainsExactlyStatusOfPaths(statuses, filePathUnderMarker,
+        markerPeer);
+  }
+
+  @Test
+  public void testGlobStatusBaseDirRecursive() throws Throwable {
+    Path escapedPath = new Path(escape(basePath.toUri().getPath()));
+    List<FileStatus> statuses =
+        exec("glob", () ->
+            toList(getFileSystem().globStatus(new Path(escapedPath, "*"))));
+    assertContainsExactlyStatusOfPaths(statuses, markerDir, markerPeer);
+    assertIsFileAtPath(markerPeer, statuses.get(1));
+  }
+
+  @Test
+  public void testGlobStatusMarkerDir() throws Throwable {
+    Path escapedPath = new Path(escape(markerDir.toUri().getPath()));
+    List<FileStatus> statuses =
+        exec("glob", () ->
+            toList(getFileSystem().globStatus(new Path(escapedPath, "*"))));
+    assertContainsFileUnderMarkerOnly(statuses);
+  }
+
+  /**
+   * Call {@code listLocatedStatus(basePath)}
+   * <p></p>
+   * The list here returns the marker peer before the
+   * dir. Reason: the listing iterators return
+   * the objects before the common prefixes, and the
+   * marker dir is coming back as a prefix.
+   */
+  @Test
+  public void testListLocatedStatusBaseDir() throws Throwable {
+    List<FileStatus> statuses =
+        exec("listLocatedStatus", () ->
+            toList(getFileSystem().listLocatedStatus(basePath)));
+
+    assertContainsExactlyStatusOfPaths(statuses, markerPeer, markerDir);
+  }
+
+  /**
+   * Call {@code listLocatedStatus(markerDir)}; expect
+   * the file entry only.
+   */
+  @Test
+  public void testListLocatedStatusMarkerDir() throws Throwable {
+    List<FileStatus> statuses =
+        exec("listLocatedStatus", () ->
+            toList(getFileSystem().listLocatedStatus(markerDir)));
+
+    assertContainsFileUnderMarkerOnly(statuses);
+  }
+
+
+  /*
+  =================================================================
+    Creation Rejection
+  =================================================================
+  */
+
+  @Test
+  public void testCreateNoOverwriteMarkerDir() throws Throwable {
+    describe("create no-overwrite over the marker dir fails");
+    head(markerKeySlash);
+    intercept(FileAlreadyExistsException.class, () ->
+        exec("create", () ->
+            getFileSystem().create(markerDir, false)));
+    // dir is still there.
+    head(markerKeySlash);
+  }
+
+  @Test
+  public void testCreateNoOverwriteFile() throws Throwable {
+    describe("create-no-overwrite on the file fails");
+
+    head(fileKeyUnderMarker);
+    intercept(FileAlreadyExistsException.class, () ->
+        exec("create", () ->
+            getFileSystem().create(filePathUnderMarker, false)));
+    assertTestObjectsExist();
+  }
+
+  @Test
+  public void testCreateFileNoOverwrite() throws Throwable {
+    describe("verify the createFile() API also fails");
+    head(fileKeyUnderMarker);
+    intercept(FileAlreadyExistsException.class, () ->
+        exec("create", () ->
+            getFileSystem().createFile(filePathUnderMarker)
+                .overwrite(false)
+                .build()));
+    assertTestObjectsExist();
+  }
+
+  /*
+  =================================================================
+    Delete.
+  =================================================================
+  */
+
+  @Test
+  public void testDelete() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    // a non recursive delete MUST fail because
+    // it is not empty
+    intercept(PathIsNotEmptyDirectoryException.class, () ->
+        fs.delete(markerDir, false));
+    // file is still there
+    head(fileKeyUnderMarker);
+
+    // recursive delete MUST succeed
+    fs.delete(markerDir, true);
+    // and the markers are gone
+    head404(fileKeyUnderMarker);
+    head404(markerKeySlash);
+    // just for completeness
+    fs.delete(basePath, true);
+  }
+
+  /*
+  =================================================================
+    Rename.
+  =================================================================
+  */
+
+  /**
+   * Rename the base directory, expect the source files to move.
+   * <p></p>
+   * Whether or not the marker itself is copied depends on whether
+   * the release's rename operation explicitly skips
+   * markers on renames.
+   */
+  @Test
+  public void testRenameBase() throws Throwable {
+    describe("rename base directory");
+
+    Path src = basePath;
+    Path dest = new Path(methodPath(), "dest");
+    assertRenamed(src, dest);
+
+    assertPathDoesNotExist("source", src);
+    assertPathDoesNotExist("source", filePathUnderMarker);
+    assertPathExists("dest not found", dest);
+
+    // all the paths dest relative
+    Path destMarkerDir = new Path(dest, MARKER);
+    // peer path has the same initial name to make sure there
+    // is no confusion there.
+    Path destMarkerPeer = new Path(dest, MARKER_PEER);
+    String destMarkerKey = toKey(destMarkerDir);
+    String destMarkerKeySlash = destMarkerKey + "/";
+    String destFileKeyUnderMarker = destMarkerKeySlash + FILENAME;
+    Path destFilePathUnderMarker = new Path(destMarkerDir, FILENAME);
+    assertIsFile(destFilePathUnderMarker);
+    assertIsFile(destMarkerPeer);
+    head(destFileKeyUnderMarker);
+
+    // probe for the marker based on expected rename
+    // behavior
+    if (RENAME_COPIES_MARKERS) {
+      head(destMarkerKeySlash);
+    } else {
+      head404(destMarkerKeySlash);
+    }
+
+  }
+
+  /**
+   * Rename a file under a marker by passing in the marker
+   * directory as the destination; the final path is derived
+   * from the original filename.
+   * <p></p>
+   * After the rename:
+   * <ol>
+   *   <li>The data must be at the derived destination path.</li>
+   *   <li>The source file must not exist.</li>
+   *   <li>The parent dir of the source file must exist.</li>
+   *   <li>The marker above the destination file must not exist.</li>
+   * </ol>
+   */
+  @Test
+  public void testRenameUnderMarkerDir() throws Throwable {
+    describe("directory rename under an existing marker");
+    String name = "sourceFile";
+    Path srcDir = new Path(basePath, "srcdir");
+    mkdirs(srcDir);
+    Path src = new Path(srcDir, name);
+    String srcKey = toKey(src);
+    put(srcKey, name);
+    head(srcKey);
+
+    // set the destination to be the marker directory.
+    Path dest = markerDir;
+    // rename the source file under the dest dir.
+    assertRenamed(src, dest);
+    assertIsFile(new Path(dest, name));
+    assertIsDirectory(srcDir);
+    head404(markerKeySlash);
+  }
+
+  /**
+   * Rename file under a marker, giving the full path to the destination
+   * file.
+   * <p></p>
+   * After the rename:
+   * <ol>
+   *   <li>The data must be at the explicit destination path.</li>
+   *   <li>The source file must not exist.</li>
+   *   <li>The parent dir of the source file must exist.</li>
+   *   <li>The marker above the destination file must not exist.</li>
+   * </ol>
+   */
+  @Test
+  public void testRenameUnderMarkerWithPath() throws Throwable {
+    describe("directory rename under an existing marker");
+    String name = "sourceFile";
+    Path srcDir = new Path(basePath, "srcdir");
+    mkdirs(srcDir);
+    Path src = new Path(srcDir, name);
+    String srcKey = toKey(src);
+    put(srcKey, name);
+    head(srcKey);
+
+    // set the destination to be the final file
+    Path dest = new Path(markerDir, "destFile");
+    // rename the source file to the destination file
+    assertRenamed(src, dest);
+    assertIsFile(dest);
+    assertIsDirectory(srcDir);
+    head404(markerKeySlash);
+  }
+
+  /**
+   * This test creates an empty dir and renames it over the directory marker.
+   * If the dest was considered to be empty, the rename would fail.
+   */
+  @Test
+  public void testRenameEmptyDirOverMarker() throws Throwable {
+    describe("rename an empty directory over the marker");
+    S3AFileSystem fs = getFileSystem();
+    String name = "sourceDir";
+    Path src = new Path(basePath, name);
+    fs.mkdirs(src);
+    assertIsDirectory(src);
+    String srcKey = toKey(src) + "/";
+    head(srcKey);
+    Path dest = markerDir;
+    // renamed into the dest dir
+    assertFalse("rename(" + src + ", " + dest + ") should have failed",
+        getFileSystem().rename(src, dest));
+    // source is still there
+    assertIsDirectory(src);
+    head(srcKey);
+    // and a non-recursive delete lets us verify it is considered
+    // an empty dir
+    assertDeleted(src, false);
+    assertTestObjectsExist();
+  }
+
+  /*
+  =================================================================
+    Utility methods and assertions.
+  =================================================================
+  */
+
+  /**
+   * Assert the test objects exist.
+   */
+  private void assertTestObjectsExist() throws Exception {
+    head(fileKeyUnderMarker);
+    head(markerKeySlash);
+  }
+
+  /**
+   * Put a string to a path.
+   * @param key key
+   * @param content string
+   */
+  private void put(final String key, final String content) throws Exception {
+    exec("PUT " + key, () ->
+        s3client.putObject(bucket, key, content));
+  }
+  /**
+   * Delete an object.
+   * @param key key
+   * @param content string
+   */
+  private void deleteObject(final String key) throws Exception {
+    exec("DELETE " + key, () -> {
+      s3client.deleteObject(bucket, key);
+      return "deleted " + key;
+    });
+  }
+
+  /**
+   * Issue a HEAD request.
+   * @param key
+   * @return a description of the object.
+   */
+  private String head(final String key) throws Exception {
+    ObjectMetadata md = exec("HEAD " + key, () ->
+        s3client.getObjectMetadata(bucket, key));
+    return String.format("Object %s of length %d",
+        key, md.getInstanceLength());
+  }
+
+  /**
+   * Issue a HEAD request and expect a 404 back.
+   * @param key
+   * @return the metadata
+   */
+  private void head404(final String key) throws Exception {
+    intercept(FileNotFoundException.class, "",
+        () -> head(key));
+  }
+
+  /**
+   * Execute an operation; transate AWS exceptions.
+   * @param op operation
+   * @param call call to make
+   * @param <T> returned type
+   * @return result of the call.
+   * @throws Exception failure
+   */
+  private <T> T exec(String op, Callable<T> call) throws Exception {
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    try {
+      return call.call();
+    } catch (AmazonClientException ex) {
+      throw S3AUtils.translateException(op, "", ex);
+    } finally {
+      timer.end(op);
+    }
+  }
+
+  /**
+   * Assert that the listing contains only the status
+   * of the file under the marker.
+   * @param statuses status objects
+   */
+  private void assertContainsFileUnderMarkerOnly(
+      final List<FileStatus> statuses) {
+
+    assertContainsExactlyStatusOfPaths(statuses, filePathUnderMarker);
+    assertIsFileUnderMarker(statuses.get(0));
+  }
+
+  /**
+   * Expect the list of status objects to match that of the paths.
+   * @param statuses status object list
+   * @param paths ordered varargs list of paths
+   * @param <T> type of status objects
+   */
+  private <T extends FileStatus> void assertContainsExactlyStatusOfPaths(
+      List<T> statuses, Path... paths) {
+
+    String actual = statuses.stream()
+        .map(Object::toString)
+        .collect(Collectors.joining(";"));
+    String expected = Arrays.stream(paths)
+        .map(Object::toString)
+        .collect(Collectors.joining(";"));
+    String summary = "expected [" + expected + "]"
+        + " actual = [" + actual + "]";
+    assertEquals("mismatch in size of listing " + summary,
+        paths.length, statuses.size());
+    for (int i = 0; i < statuses.size(); i++) {
+      assertEquals("Path mismatch at element " + i + " in " + summary,
+          paths[i], statuses.get(i).getPath());
+    }
+  }
+
+  /**
+   * Assert the status object refers to the file created
+   * under the marker.
+   * @param stat status object
+   */
+  private void assertIsFileUnderMarker(final FileStatus stat) {
+    assertIsFileAtPath(filePathUnderMarker, stat);
+  }
+
+  /**
+   * Assert the status object refers to a path at the given name.
+   * @param path path
+   * @param stat status object
+   */
+  private void assertIsFileAtPath(final Path path, final FileStatus stat) {
+    assertTrue("Is not file " + stat, stat.isFile());
+    assertPathEquals(path, stat);
+  }
+
+  /**
+   * Assert a status object's path matches expected.
+   * @param path path to expect
+   * @param stat status object
+   */
+  private void assertPathEquals(final Path path, final FileStatus stat) {
+    assertEquals("filename is not the expected path :" + stat,
+        path, stat.getPath());
+  }
+
+  /**
+   * Given a remote iterator of status objects,
+   * build a list of the values.
+   * @param status status list
+   * @param <T> actual type.
+   * @return source.
+   * @throws IOException
+   */
+  private <T extends FileStatus> List<FileStatus> toList(
+      RemoteIterator<T> status) throws IOException {
+
+    List<FileStatus> l = new ArrayList<>();
+    while (status.hasNext()) {
+      l.add(status.next());
+    }
+    return dump(l);
+  }
+
+  /**
+   * Given an array of status objects,
+   * build a list of the values.
+   * @param status status list
+   * @param <T> actual type.
+   * @return source.
+   * @throws IOException
+   */
+  private <T extends FileStatus> List<FileStatus> toList(
+      T[] status) throws IOException {
+    return dump(Arrays.asList(status));
+  }
+
+  /**
+   * Dump the string values of a list to the log; return
+   * the list.
+   * @param l source.
+   * @param <T> source type
+   * @return the list
+   */
+  private <T> List<T> dump(List<T> l) {
+    int c = 1;
+    for (T t : l) {
+      LOG.info("{}\t{}", c++, t);
+    }
+    return l;
+  }
+
+  /**
+   * Rename: assert the outcome is true.
+   * @param src source path
+   * @param dest dest path
+   */
+  private void assertRenamed(final Path src, final Path dest)
+      throws IOException {
+    assertTrue("rename(" + src + ", " + dest + ") failed",
+        getFileSystem().rename(src, dest));
+  }
+
+  /**
+   * Convert a path to a key; does not add any trailing / .
+   * @param path path in
+   * @return key out
+   */
+  private String toKey(final Path path) {
+    return getFileSystem().pathToKey(path);
+  }
+
+  /**
+   * Escape paths before handing to globStatus; this is needed as
+   * parameterized runs produce paths with [] in them.
+   * @param pathstr source path string
+   * @return an escaped path string
+   */
+  private String escape(String pathstr) {
+    StringBuilder r = new StringBuilder();
+    for (char c : pathstr.toCharArray()) {
+      String ch = Character.toString(c);
+      if ("?*[{".contains(ch)) {
+        r.append("\\");
+      }
+      r.append(ch);
+    }
+    return r.toString();
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
new file mode 100644
index 0000000..c742e4d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+/**
+ * Declaration of the costs of head and list calls for various FS IO
+ * operations.
+ * <p></p>
+ * An instance declares the number of head and list calls expected for
+ * various operations -with a {@link #plus(OperationCost)}
+ * method to add operation costs together to produce an
+ * aggregate cost. These can then be validated in tests
+ * via {@link OperationCostValidator}.
+ *
+ */
+public final class OperationCost {
+
+  /** Head costs for getFileStatus() directory probe: {@value}. */
+  public static final int FILESTATUS_DIR_PROBE_H = 0;
+
+  /** List costs for getFileStatus() directory probe: {@value}. */
+  public static final int FILESTATUS_DIR_PROBE_L = 1;
+
+  /** Head cost getFileStatus() file probe only. */
+  public static final int FILESTATUS_FILE_PROBE_H = 1;
+
+  /** Liast cost getFileStatus() file probe only. */
+
+  public static final int FILESTATUS_FILE_PROBE_L = 0;
+
+  /**
+   * Delete cost when deleting an object.
+   */
+  public static final int DELETE_OBJECT_REQUEST = 1;
+
+  /**
+   * Delete cost when deleting a marker.
+   */
+  public static final int DELETE_MARKER_REQUEST = DELETE_OBJECT_REQUEST;
+
+  /**
+   * No IO takes place.
+   */
+  public static final OperationCost NO_IO =
+      new OperationCost(0, 0);
+
+  /** A HEAD operation. */
+  public static final OperationCost HEAD_OPERATION = new OperationCost(1, 0);
+
+  /** A LIST operation. */
+  public static final OperationCost LIST_OPERATION = new OperationCost(0, 1);
+
+  /**
+   * Cost of {@link org.apache.hadoop.fs.s3a.impl.StatusProbeEnum#DIRECTORIES}.
+   */
+  public static final OperationCost FILE_STATUS_DIR_PROBE = LIST_OPERATION;
+
+  /**
+   * Cost of {@link org.apache.hadoop.fs.s3a.impl.StatusProbeEnum#FILE}.
+   */
+  public static final OperationCost FILE_STATUS_FILE_PROBE = HEAD_OPERATION;
+
+  /**
+   * Cost of {@link org.apache.hadoop.fs.s3a.impl.StatusProbeEnum#ALL}.
+   */
+  public static final OperationCost FILE_STATUS_ALL_PROBES =
+      FILE_STATUS_FILE_PROBE.plus(FILE_STATUS_DIR_PROBE);
+
+  /** getFileStatus() on a file which exists. */
+  public static final OperationCost GET_FILE_STATUS_ON_FILE =
+      FILE_STATUS_FILE_PROBE;
+
+  /** List costs for getFileStatus() on a non-empty directory: {@value}. */
+  public static final OperationCost GET_FILE_STATUS_ON_DIR =
+      FILE_STATUS_FILE_PROBE.plus(FILE_STATUS_DIR_PROBE);
+
+  /** Costs for getFileStatus() on an empty directory: {@value}. */
+  public static final OperationCost GET_FILE_STATUS_ON_EMPTY_DIR =
+      GET_FILE_STATUS_ON_DIR;
+
+  /** getFileStatus() directory marker which exists. */
+  public static final OperationCost GET_FILE_STATUS_ON_DIR_MARKER =
+      GET_FILE_STATUS_ON_EMPTY_DIR;
+
+  /** getFileStatus() call which fails to find any entry. */
+  public static final OperationCost GET_FILE_STATUS_FNFE =
+      FILE_STATUS_ALL_PROBES;
+
+  /** listLocatedStatus always does a LIST. */
+  public static final OperationCost LIST_LOCATED_STATUS_LIST_OP =
+      new OperationCost(0, 1);
+
+  /** listFiles always does a LIST. */
+  public static final OperationCost LIST_FILES_LIST_OP =
+      new OperationCost(0, 1);
+
+  /**
+   * Metadata cost of a copy operation, as used during rename.
+   * This happens even if the store is guarded.
+   */
+  public static final OperationCost COPY_OP =
+      new OperationCost(1, 0);
+
+  /**
+   * Cost of renaming a file to a different directory.
+   * <p></p>
+   * LIST on dest not found, look for dest dir, and then, at
+   * end of rename, whether a parent dir needs to be created.
+   */
+  public static final OperationCost RENAME_SINGLE_FILE_DIFFERENT_DIR =
+      FILE_STATUS_FILE_PROBE              // source file probe
+          .plus(GET_FILE_STATUS_FNFE)     // dest does not exist
+          .plus(FILE_STATUS_DIR_PROBE)    // parent dir of dest
+          .plus(FILE_STATUS_DIR_PROBE)    // recreate source parent dir?
+          .plus(COPY_OP);                 // metadata read on copy
+
+  /**
+   * Cost of renaming a file to the same directory
+   * <p></p>
+   * No need to look for parent directories, so only file
+   * existence checks and the copy.
+   */
+  public static final OperationCost RENAME_SINGLE_FILE_SAME_DIR =
+      FILE_STATUS_FILE_PROBE              // source file probe
+          .plus(GET_FILE_STATUS_FNFE)     // dest must not exist
+          .plus(COPY_OP);                 // metadata read on copy
+
+  /**
+   * create(overwrite = true) does not look for the file existing.
+   */
+  public static final OperationCost CREATE_FILE_OVERWRITE =
+      FILE_STATUS_DIR_PROBE;
+
+  /**
+   * create(overwrite = false) runs all the checks.
+   */
+  public static final OperationCost CREATE_FILE_NO_OVERWRITE =
+      FILE_STATUS_ALL_PROBES;
+
+  /** Expected HEAD count. */
+  private final int head;
+
+  /** Expected LIST count. */
+  private final int list;
+
+  /**
+   * Constructor.
+   * @param head head requests.
+   * @param list list requests.
+   */
+  public OperationCost(final int head,
+      final int list) {
+    this.head = head;
+    this.list = list;
+  }
+
+  /** Expected HEAD count. */
+  public int head() {
+    return head;
+  }
+
+  /** Expected LIST count. */
+  public int list() {
+    return list;
+  }
+
+  /**
+   * Add to create a new cost.
+   * @param that the other entry
+   * @return cost of the combined operation.
+   */
+  public OperationCost plus(OperationCost that) {
+    return new OperationCost(
+        head + that.head,
+        list + that.list);
+  }
+
+  @Override
+  public String toString() {
+    return "OperationCost{" +
+        "head=" + head +
+        ", list=" + list +
+        '}';
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCostValidator.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCostValidator.java
new file mode 100644
index 0000000..c71cd87
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCostValidator.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_LIST_REQUESTS;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_METADATA_REQUESTS;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Support for declarative assertions about operation cost.
+ * <p></p>
+ * Usage: A builder is used to declare the set of statistics
+ * to be monitored in the filesystem.
+ * <p></p>
+ * A call to {@link #exec(Callable, ExpectedProbe...)}
+ * executes the callable if 1+ probe is enabled; after
+ * invocation the probes are validated.
+ * The result of the callable is returned.
+ * <p></p>
+ * A call of {@link #intercepting(Class, String, Callable, ExpectedProbe...)}
+ * Invokes the callable if 1+ probe is enabled, expects an exception
+ * to be raised and then verifies metrics declared in the probes.
+ * <p></p>
+ * Probes are built up from the static method to create probes
+ * for metrics:
+ * <ul>
+ *   <li>{@link #probe(boolean, Statistic, int)} </li>
+ *   <li>{@link #probe(Statistic, int)} </li>
+ *   <li>{@link #probes(boolean, ExpectedProbe...)} (Statistic, int)} </li>
+ *   <li>{@link #always()}</li>
+ * </ul>
+ * If any probe evaluates to false, an assertion is raised.
+ * <p></p>
+ * When this happens: look in the logs!
+ * The logs will contain the whole set of metrics, the probe details
+ * and the result of the call.
+ */
+public final class OperationCostValidator {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OperationCostValidator.class);
+
+  /**
+   * The empty probe: declared as disabled.
+   */
+  private static final ExpectedProbe EMPTY_PROBE =
+      new EmptyProbe("empty", false);
+
+  /**
+   * A probe which is always enabled.
+   */
+  private static final ExpectedProbe ALWAYS_PROBE =
+      new EmptyProbe("always", true);
+
+  /**
+   * The map of metric diffs to track.
+   */
+  private final Map<String, S3ATestUtils.MetricDiff> metricDiffs
+      = new TreeMap<>();
+
+  /**
+   * Build the instance.
+   * @param builder builder containing all options.
+   */
+  private OperationCostValidator(Builder builder) {
+    builder.metrics.forEach(stat ->
+        metricDiffs.put(stat.getSymbol(),
+            new S3ATestUtils.MetricDiff(builder.filesystem, stat)));
+    builder.metrics.clear();
+  }
+
+  /**
+   * Reset all the metrics being tracked.
+   */
+  public void resetMetricDiffs() {
+    metricDiffs.values().forEach(S3ATestUtils.MetricDiff::reset);
+  }
+
+  /**
+   * Get the diff of a statistic.
+   * @param stat statistic to look up
+   * @return the value
+   * @throws NullPointerException if there is no match
+   */
+  public S3ATestUtils.MetricDiff get(Statistic stat) {
+    S3ATestUtils.MetricDiff diff =
+        requireNonNull(metricDiffs.get(stat.getSymbol()),
+            () -> "No metric tracking for " + stat);
+    return diff;
+  }
+
+  /**
+   * Execute a closure and verify the metrics.
+   * <p></p>
+   * If no probes are active, the operation will
+   * raise an Assumption exception for the test to be skipped.
+   * @param eval closure to evaluate
+   * @param expected varargs list of expected diffs
+   * @param <T> return type.
+   * @return the result of the evaluation
+   */
+  public <T> T exec(
+      Callable<T> eval,
+      ExpectedProbe... expectedA) throws Exception {
+    List<ExpectedProbe> expected = Arrays.asList(expectedA);
+    resetMetricDiffs();
+    // verify that 1+ probe is enabled
+    assumeProbesEnabled(expected);
+    // if we get here, then yes.
+    // evaluate it
+    T r = eval.call();
+    // build the text for errors
+    String text =
+        "operation returning "
+            + (r != null ? r.toString() : "null");
+    LOG.info("{}", text);
+    LOG.info("state {}", this);
+    LOG.info("probes {}", expected);
+    for (ExpectedProbe ed : expected) {
+      ed.verify(this, text);
+    }
+    return r;
+  }
+
+  /**
+   * Scan all probes for being enabled.
+   * <p></p>
+   * If none of them are enabled, the evaluation will be skipped.
+   * @param expected list of expected probes
+   */
+  private void assumeProbesEnabled(List<ExpectedProbe> expected) {
+    boolean enabled = false;
+    for (ExpectedProbe ed : expected) {
+      enabled |= ed.isEnabled();
+    }
+    String pstr = expected.stream()
+        .map(Object::toString)
+        .collect(Collectors.joining(", "));
+    assertTrue("metrics to probe for are not enabled in " + pstr, enabled);
+  }
+
+  /**
+   * Execute a closure, expecting an exception.
+   * Verify the metrics after the exception has been caught and
+   * validated.
+   * @param clazz type of exception
+   * @param text text to look for in exception (optional)
+   * @param eval closure to evaluate
+   * @param expected varargs list of expected diffs
+   * @param <T> return type of closure
+   * @param <E> exception type
+   * @return the exception caught.
+   * @throws Exception any other exception
+   */
+  public <T, E extends Throwable> E intercepting(
+      Class<E> clazz,
+      String text,
+      Callable<T> eval,
+      ExpectedProbe... expected) throws Exception {
+
+    return exec(() ->
+            intercept(clazz, text, eval),
+        expected);
+  }
+
+  @Override
+  public String toString() {
+    return metricDiffs.values().stream()
+        .map(S3ATestUtils.MetricDiff::toString)
+        .collect(Collectors.joining(", "));
+  }
+
+  /**
+   * Create a builder for the cost checker.
+   *
+   * @param fs filesystem.
+   * @return builder.
+   */
+  public static Builder builder(S3AFileSystem fs) {
+    return new Builder(fs);
+  }
+
+  /**
+   * builder.
+   */
+  public static final class Builder {
+
+    /**
+     * Filesystem.
+     */
+    private final S3AFileSystem filesystem;
+
+    /**
+     * Metrics to create.
+     */
+    private final List<Statistic> metrics = new ArrayList<>();
+
+
+    /**
+     * Create with a required filesystem.
+     * @param filesystem monitored filesystem
+     */
+    public Builder(final S3AFileSystem filesystem) {
+      this.filesystem = requireNonNull(filesystem);
+    }
+
+
+    /**
+     * Add a single metric.
+     * @param statistic statistic to monitor.
+     * @return this
+     */
+    public Builder withMetric(Statistic statistic) {
+      return withMetric(statistic);
+    }
+
+    /**
+     * Add a varargs list of metrics.
+     * @param stat statistics to monitor.
+     * @return this.
+     */
+    public Builder withMetrics(Statistic...stats) {
+      metrics.addAll(Arrays.asList(stats));
+      return this;
+    }
+
+    /**
+     * Instantiate.
+     * @return the validator.
+     */
+    public OperationCostValidator build() {
+      return new OperationCostValidator(this);
+    }
+  }
+
+  /**
+   * Get the "always" probe.
+   * @return a probe which always triggers execution.
+   */
+  public static ExpectedProbe always() {
+    return ALWAYS_PROBE;
+  }
+
+  /**
+   * Create a probe of a statistic which is enabled whenever the expected
+   * value is greater than zero.
+   * @param statistic statistic to check.
+   * @param expected expected value.
+   * @return a probe.
+   */
+  public static ExpectedProbe probe(
+      final Statistic statistic,
+      final int expected) {
+    return probe(expected >= 0, statistic, expected);
+  }
+
+  /**
+   * Create a probe of a statistic which is conditionally enabled.
+   * @param enabled is the probe enabled?
+   * @param statistic statistic to check.
+   * @param expected expected value.
+   * @return a probe.
+   */
+  public static ExpectedProbe probe(
+      final boolean enabled,
+      final Statistic statistic,
+      final int expected) {
+    return enabled
+        ? new ExpectSingleStatistic(statistic, expected)
+        : EMPTY_PROBE;
+  }
+
+  /**
+   * Create an aggregate probe from a vararges list of probes.
+   * @param enabled should the probes be enabled?
+   * @param plist probe list
+   * @return a probe
+   */
+  public static ExpectedProbe probes(
+      final boolean enabled,
+      final ExpectedProbe...plist) {
+    return enabled
+        ? new ProbeList(Arrays.asList(plist))
+        : EMPTY_PROBE;
+  }
+
+  /**
+   * Expect the exact head and list requests of the operation
+   * cost supplied.
+   * @param enabled is the probe enabled?
+   * @param cost expected cost.
+   * @return a probe.
+   */
+  public static ExpectedProbe expect(
+      boolean enabled, OperationCost cost) {
+    return probes(enabled,
+        probe(OBJECT_METADATA_REQUESTS, cost.head()),
+        probe(OBJECT_LIST_REQUESTS, cost.list()));
+  }
+
+  /**
+   * An expected probe to verify given criteria to trigger an eval.
+   * <p></p>
+   * Probes can be conditional, in which case they are only evaluated
+   * when true.
+   */
+  public interface ExpectedProbe {
+
+    /**
+     * Verify a diff if the FS instance is compatible.
+     * @param message message to print; metric name is appended
+     */
+    void verify(OperationCostValidator diffs, String message);
+
+    boolean isEnabled();
+  }
+
+  /**
+   * Simple probe is a single statistic.
+   */
+  public static final class ExpectSingleStatistic implements ExpectedProbe {
+
+    private final Statistic statistic;
+
+    private final int expected;
+
+    /**
+     * Create.
+     * @param statistic statistic
+     * @param expected expected value.
+     */
+    private ExpectSingleStatistic(final Statistic statistic,
+        final int expected) {
+      this.statistic = statistic;
+      this.expected = expected;
+    }
+
+    /**
+     * Verify a diff if the FS instance is compatible.
+     * @param message message to print; metric name is appended
+     */
+    @Override
+    public void verify(OperationCostValidator diffs, String message) {
+      diffs.get(statistic).assertDiffEquals(message, expected);
+    }
+
+    public Statistic getStatistic() {
+      return statistic;
+    }
+
+    public int getExpected() {
+      return expected;
+    }
+
+    @Override
+    public boolean isEnabled() {
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      String sb = "ExpectSingleStatistic{"
+          + statistic
+          + ", expected=" + expected
+          + ", enabled=" + isEnabled()
+          + '}';
+      return sb;
+    }
+  }
+
+  /**
+   * A list of probes; the verify operation
+   * verifies them all.
+   */
+  public static class ProbeList implements ExpectedProbe {
+
+    /**
+     * Probe list.
+     */
+    private final List<ExpectedProbe> probes;
+
+    /**
+     * Constructor.
+     * @param probes probe list.
+     */
+    public ProbeList(final List<ExpectedProbe> probes) {
+      this.probes = probes;
+    }
+
+    @Override
+    public void verify(final OperationCostValidator diffs,
+        final String message) {
+      probes.forEach(p -> p.verify(diffs, message));
+    }
+
+    /**
+     * Enabled if 1+ probe is enabled.
+     * @return true if enabled.
+     */
+    @Override
+    public boolean isEnabled() {
+      boolean enabled = false;
+      for (ExpectedProbe probe : probes) {
+        enabled |= probe.isEnabled();
+      }
+      return enabled;
+    }
+
+    @Override
+    public String toString() {
+      String pstr = probes.stream()
+          .map(Object::toString)
+          .collect(Collectors.joining(", "));
+      return "ProbeList{" + pstr + '}';
+    }
+  }
+
+  /**
+   * The empty probe always runs; it can be used to force
+   * a verification to execute.
+   */
+  private static final class EmptyProbe implements ExpectedProbe {
+
+    private final String name;
+
+    private final boolean enabled;
+
+    private EmptyProbe(final String name, boolean enabled) {
+      this.name = name;
+      this.enabled = enabled;
+    }
+
+    @Override
+    public void verify(final OperationCostValidator diffs,
+        final String message) {
+    }
+
+    @Override
+    public boolean isEnabled() {
+      return enabled;
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
index a33c001..8553ee7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
 
+import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.E_BAD_STATE;
 import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
@@ -288,4 +289,39 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
     assertEquals("Command " + cmd + " failed\n"+ buf, 0, r);
   }
 
+  @Test
+  public void testLandsatBucketMarkerAware() throws Throwable {
+    describe("verify that -markers aware succeeds");
+    run(S3GuardTool.BucketInfo.NAME,
+        "-" + S3GuardTool.BucketInfo.MARKERS_FLAG,
+        S3GuardTool.BucketInfo.MARKERS_AWARE,
+        getLandsatCSVFile());
+  }
+
+  @Test
+  public void testLandsatBucketMarkerDelete() throws Throwable {
+    describe("verify that -markers delete succeeds");
+    run(S3GuardTool.BucketInfo.NAME,
+        "-" + S3GuardTool.BucketInfo.MARKERS_FLAG, "delete",
+        getLandsatCSVFile());
+  }
+
+  @Test
+  public void testLandsatBucketMarkerKeepFails() throws Throwable {
+    describe("verify that -markers keep fails");
+    runToFailure(E_BAD_STATE,
+        S3GuardTool.BucketInfo.NAME,
+        "-" + S3GuardTool.BucketInfo.MARKERS_FLAG, "keep",
+        getLandsatCSVFile());
+  }
+
+  @Test
+  public void testLandsatBucketMarkerAuthFails() throws Throwable {
+    describe("verify that -markers authoritative fails");
+    runToFailure(E_BAD_STATE,
+        S3GuardTool.BucketInfo.NAME,
+        "-" + S3GuardTool.BucketInfo.MARKERS_FLAG, "authoritative",
+        getLandsatCSVFile());
+  }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/costs/HeadListCosts.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/costs/HeadListCosts.java
new file mode 100644
index 0000000..ef15a72
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/costs/HeadListCosts.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.test.costs;
+
+/**
+ * Declaration of the costs of head and list calls for various FS IO operations.
+ */
+public class HeadListCosts {
+
+  /** Head costs for getFileStatus() directory probe: {@value}. */
+  public static final int FILESTATUS_DIR_PROBE_H = 0;
+
+  /** List costs for getFileStatus() directory probe: {@value}. */
+  public static final int FILESTATUS_DIR_PROBE_L = 1;
+
+
+  /** Head cost getFileStatus() file probe only. */
+  public static final int FILESTATUS_FILE_PROBE_H = 1;
+
+  /** Liast cost getFileStatus() file probe only. */
+
+  public static final int FILESTATUS_FILE_PROBE_L = 0;
+
+  /** Head costs getFileStatus() no file or dir. */
+  public static final int GETFILESTATUS_FNFE_H = FILESTATUS_FILE_PROBE_H;
+
+  /** List costs for getFileStatus() on an empty path: {@value}. */
+
+  public static final int GETFILESTATUS_FNFE_L = FILESTATUS_DIR_PROBE_L;
+
+  /** getFileStatus() directory which is non-empty. */
+  public static final int GETFILESTATUS_DIR_H = FILESTATUS_FILE_PROBE_H;
+
+  /** List costs for getFileStatus() on a non-empty directory: {@value}. */
+  public static final int GETFILESTATUS_DIR_L = FILESTATUS_DIR_PROBE_L;
+
+  /** List costs for getFileStatus() on an non-empty directory: {@value}. */
+  public static final int GETFILESTATUS_EMPTY_DIR_L = FILESTATUS_DIR_PROBE_L;
+  /** List costs for getFileStatus() on an non-empty directory: {@value}. */
+  public static final int GETFILESTATUS_EMPTY_DIR_H = GETFILESTATUS_DIR_H;
+
+  /** getFileStatus() directory marker which exists. */
+  public static final int GETFILESTATUS_MARKER_H = FILESTATUS_FILE_PROBE_H;
+
+  /** getFileStatus() on a file which exists. */
+  public static final int GETFILESTATUS_SINGLE_FILE_H = FILESTATUS_FILE_PROBE_H;
+
+
+  public static final int GETFILESTATUS_SINGLE_FILE_L = FILESTATUS_FILE_PROBE_L;
+
+  public static final int DELETE_OBJECT_REQUEST = 1;
+
+  public static final int DELETE_MARKER_REQUEST = 1;
+
+  /** listLocatedStatus always does a list. */
+  public static final int LIST_LOCATED_STATUS_L = 1;
+
+  public static final int LIST_FILES_L = 1;
+
+  /**
+   * Cost of renaming a file to a different directory.
+   * <p></p>
+   * LIST on dest not found, look for dest dir, and then, at
+   * end of rename, whether a parent dir needs to be created.
+   */
+  public static final int RENAME_SINGLE_FILE_RENAME_DIFFERENT_DIR_L =
+      GETFILESTATUS_FNFE_L + GETFILESTATUS_DIR_L * 2;
+
+  /**
+   * Cost of renaming a file to a different directory.
+   * <p></p>
+   * LIST on dest not found, look for dest dir, and then, at
+   * end of rename, whether a parent dir needs to be created.
+   */
+  public static final int RENAME_SINGLE_FILE_RENAME_SAME_DIR_L =
+      GETFILESTATUS_FNFE_L;
+
+  /**
+   * Rename a single file.
+   * <p></p>
+   * source is found, dest not found, copy adds a
+   * metadata request.
+   */
+  public static final int RENAME_SINGLE_FILE_RENAME_H =
+      FILESTATUS_FILE_PROBE_H + GETFILESTATUS_FNFE_H + 1;
+
+  /**
+   * Create file no overwrite head : {@value}.
+   */
+  public static final int CREATE_FILE_OVERWRITE_H = 0;
+
+  /**
+   * Create file no overwrite list : {@value}.
+   */
+  public static final int CREATE_FILE_OVERWRITE_L = FILESTATUS_DIR_PROBE_L;
+
+  /**
+   * Create file no overwrite head : {@value}.
+   */
+  public static final int CREATE_FILE_NO_OVERWRITE_H = FILESTATUS_FILE_PROBE_H;
+
+  /**
+   * Create file no overwrite list : {@value}.
+   */
+  public static final int CREATE_FILE_NO_OVERWRITE_L = FILESTATUS_DIR_PROBE_L;
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org