You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/11/18 13:27:10 UTC

[hadoop] branch trunk updated: HADOOP-17244. S3A directory delete tombstones dir markers prematurely. (#2310)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3c08f2  HADOOP-17244. S3A directory delete tombstones dir markers prematurely. (#2310)
e3c08f2 is described below

commit e3c08f285a6ac02f51ddf0007db76fc3da7ec372
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Wed Nov 18 12:15:52 2020 +0000

    HADOOP-17244. S3A directory delete tombstones dir markers prematurely. (#2310)
    
    This fixes the S3Guard/Directory Marker Retention integration so that when
    fs.s3a.directory.marker.retention=keep, failures during multipart delete
    are handled correctly, as are incremental deletes during
    directory tree operations.
    
    In both cases, when a directory marker with children is deleted from
    S3, the directory entry in S3Guard is not deleted, because it is still
    critical to representing the structure of the store.
    
    Contributed by Steve Loughran.
    
    Change-Id: I4ca133a23ea582cd42ec35dbf2dc85b286297d2f
---
 .../AbstractContractRootDirectoryTest.java         |   2 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  20 +-
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |   1 +
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |   2 +
 .../apache/hadoop/fs/s3a/impl/DeleteOperation.java | 104 +++++--
 .../fs/s3a/impl/MultiObjectDeleteSupport.java      | 231 ++++++++++++---
 .../hadoop/fs/s3a/impl/OperationCallbacks.java     |   4 +-
 .../apache/hadoop/fs/s3a/impl/RenameOperation.java |  20 +-
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java      |   2 +-
 .../markdown/tools/hadoop-aws/directory_markers.md |  66 ++++-
 .../hadoop/fs/s3a/ITestS3AFailureHandling.java     |  12 +-
 .../hadoop/fs/s3a/ITestS3GuardEmptyDirs.java       |  24 ++
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     | 314 +--------------------
 .../fs/s3a/impl/ITestPartialRenamesDeletes.java    | 157 +++++++++--
 .../fs/s3a/impl/TestPartialDeleteFailures.java     | 102 +++++--
 .../fs/s3a/performance/AbstractS3ACostTest.java    |   6 +-
 .../fs/s3a/performance/ITestS3ADeleteCost.java     |  71 ++++-
 .../s3a/test/MinimalListingOperationCallbacks.java |  85 ++++++
 .../fs/s3a/test/MinimalOperationCallbacks.java     | 128 +++++++++
 .../hadoop/fs/s3a/test/OperationTrackingStore.java | 189 +++++++++++++
 20 files changed, 1100 insertions(+), 440 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
index 6eaa56b..4b5af02 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
@@ -265,7 +265,7 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
         fs.listFiles(root, true));
     describe("verifying consistency with treewalk's files");
     ContractTestUtils.TreeScanResults treeWalk = treeWalk(fs, root);
-    treeWalk.assertFieldsEquivalent("files", listing,
+    treeWalk.assertFieldsEquivalent("treewalk vs listFiles(/, true)", listing,
         treeWalk.getFiles(),
         listing.getFiles());
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index ae94d21..7379f9a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1576,7 +1576,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
     @Override
     @Retries.RetryTranslated
-    public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
+    public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
         final Path path,
         final S3AFileStatus status,
         final boolean collectTombstones,
@@ -2081,6 +2081,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           DELETE_CONSIDERED_IDEMPOTENT,
           ()-> {
             incrementStatistic(OBJECT_DELETE_REQUESTS);
+            incrementStatistic(OBJECT_DELETE_OBJECTS);
             s3.deleteObject(bucket, key);
             return null;
           });
@@ -2127,9 +2128,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   /**
-   * Perform a bulk object delete operation.
+   * Perform a bulk object delete operation against S3; leaves S3Guard
+   * alone.
    * Increments the {@code OBJECT_DELETE_REQUESTS} and write
-   * operation statistics.
+   * operation statistics
+   * <p></p>
+   * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
+   * of objects deleted in the request.
+   * <p></p>
    * Retry policy: retry untranslated; delete considered idempotent.
    * If the request is throttled, this is logged in the throttle statistics,
    * with the counter set to the number of keys, rather than the number
@@ -2150,9 +2156,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     incrementWriteOperations();
     BulkDeleteRetryHandler retryHandler =
         new BulkDeleteRetryHandler(createStoreContext());
+    int keyCount = deleteRequest.getKeys().size();
     try(DurationInfo ignored =
             new DurationInfo(LOG, false, "DELETE %d keys",
-                deleteRequest.getKeys().size())) {
+                keyCount)) {
       return invoker.retryUntranslated("delete",
           DELETE_CONSIDERED_IDEMPOTENT,
           (text, e, r, i) -> {
@@ -2161,6 +2168,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           },
           () -> {
             incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
+            incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
             return s3.deleteObjects(deleteRequest);
           });
     } catch (MultiObjectDeleteException e) {
@@ -2550,8 +2558,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         // entries so we only process these failures on "real" deletes.
         Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>> results =
             new MultiObjectDeleteSupport(createStoreContext(), operationState)
-                .processDeleteFailure(ex, keysToDelete);
-        undeletedObjectsOnFailure.addAll(results.getMiddle());
+                .processDeleteFailure(ex, keysToDelete, new ArrayList<Path>());
+        undeletedObjectsOnFailure.addAll(results.getLeft());
       }
       throw ex;
     } catch (AmazonClientException | IOException ex) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index f442358..19f42cf 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -156,6 +156,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
       INVOCATION_RENAME,
       OBJECT_COPY_REQUESTS,
       OBJECT_DELETE_REQUESTS,
+      OBJECT_DELETE_OBJECTS,
       OBJECT_LIST_REQUESTS,
       OBJECT_CONTINUE_LIST_REQUESTS,
       OBJECT_METADATA_REQUESTS,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 8153169..1addfbe 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -85,6 +85,8 @@ public enum Statistic {
       "Calls of rename()"),
   OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
   OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
+  OBJECT_DELETE_OBJECTS("object_delete_objects",
+      "Objects deleted in delete requests"),
   OBJECT_LIST_REQUESTS("object_list_requests",
       "Number of object listings made"),
   OBJECT_CONTINUE_LIST_REQUESTS("object_continue_list_requests",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java
index a2deaa4..b47c7ad 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsResult;
@@ -152,10 +153,13 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
   /**
    * List of keys built up for the next delete batch.
    */
-  private List<DeleteObjectsRequest.KeyVersion> keys;
+  private List<DeleteEntry> keys;
 
   /**
-   * List of paths built up for deletion.
+   * List of paths built up for incremental deletion on tree delete.
+   * At the end of the entire delete the full tree is scanned in S3Guard
+   * and tombstones added. For this reason this list of paths <i>must not</i>
+   * include directory markers, as that will break the scan.
    */
   private List<Path> paths;
 
@@ -279,7 +283,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
       LOG.debug("deleting simple file {}", path);
       deleteObjectAtPath(path, key, true);
     }
-    LOG.debug("Deleted {} files", filesDeleted);
+    LOG.debug("Deleted {} objects", filesDeleted);
     return true;
   }
 
@@ -323,7 +327,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
       // list files including any under tombstones through S3Guard
       LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
       final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
-          callbacks.listFilesAndEmptyDirectories(path, status,
+          callbacks.listFilesAndDirectoryMarkers(path, status,
               false, true);
 
       // iterate through and delete. The next() call will block when a new S3
@@ -359,7 +363,10 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
         while (objects.hasNext()) {
           // get the next entry in the listing.
           extraFilesDeleted++;
-          queueForDeletion(deletionKey(objects.next()), null);
+          S3AFileStatus next = objects.next();
+          LOG.debug("Found Unlisted entry {}", next);
+          queueForDeletion(deletionKey(next), null,
+              next.isDirectory());
         }
         if (extraFilesDeleted > 0) {
           LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
@@ -402,7 +409,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
    */
   private void queueForDeletion(
       final S3AFileStatus stat) throws IOException {
-    queueForDeletion(deletionKey(stat), stat.getPath());
+    queueForDeletion(deletionKey(stat), stat.getPath(), stat.isDirectory());
   }
 
   /**
@@ -413,14 +420,18 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
    *
    * @param key key to delete
    * @param deletePath nullable path of the key
+   * @param isDirMarker is the entry a directory?
    * @throws IOException failure of the previous batch of deletions.
    */
   private void queueForDeletion(final String key,
-      @Nullable final Path deletePath) throws IOException {
+      @Nullable final Path deletePath,
+      boolean isDirMarker) throws IOException {
     LOG.debug("Adding object to delete: \"{}\"", key);
-    keys.add(new DeleteObjectsRequest.KeyVersion(key));
+    keys.add(new DeleteEntry(key, isDirMarker));
     if (deletePath != null) {
-      paths.add(deletePath);
+      if (!isDirMarker) {
+        paths.add(deletePath);
+      }
     }
 
     if (keys.size() == pageSize) {
@@ -484,7 +495,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
    * @return the submitted future or null
    */
   private CompletableFuture<Void> submitDelete(
-      final List<DeleteObjectsRequest.KeyVersion> keyList,
+      final List<DeleteEntry> keyList,
       final List<Path> pathList) {
 
     if (keyList.isEmpty() && pathList.isEmpty()) {
@@ -514,31 +525,62 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
   @Retries.RetryTranslated
   private void asyncDeleteAction(
       final BulkOperationState state,
-      final List<DeleteObjectsRequest.KeyVersion> keyList,
+      final List<DeleteEntry> keyList,
       final List<Path> pathList,
       final boolean auditDeletedKeys)
       throws IOException {
+    List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
     try (DurationInfo ignored =
-             new DurationInfo(LOG, false, "Delete page of keys")) {
+             new DurationInfo(LOG, false,
+                 "Delete page of %d keys", keyList.size())) {
       DeleteObjectsResult result = null;
       List<Path> undeletedObjects = new ArrayList<>();
       if (!keyList.isEmpty()) {
-        result = Invoker.once("Remove S3 Keys",
+        // first delete the files.
+        List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
+            .filter(e -> !e.isDirMarker)
+            .map(e -> e.keyVersion)
+            .collect(Collectors.toList());
+        LOG.debug("Deleting of {} file objects", files.size());
+        result = Invoker.once("Remove S3 Files",
             status.getPath().toString(),
             () -> callbacks.removeKeys(
-                keyList,
+                files,
                 false,
                 undeletedObjects,
                 state,
                 !auditDeletedKeys));
+        if (result != null) {
+          deletedObjects.addAll(result.getDeletedObjects());
+        }
+        // now the dirs
+        List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
+            .filter(e -> e.isDirMarker)
+            .map(e -> e.keyVersion)
+            .collect(Collectors.toList());
+        LOG.debug("Deleting of {} directory markers", dirs.size());
+        // This is invoked with deleteFakeDir = true, so
+        // S3Guard is not updated.
+        result = Invoker.once("Remove S3 Dir Markers",
+            status.getPath().toString(),
+            () -> callbacks.removeKeys(
+                dirs,
+                true,
+                undeletedObjects,
+                state,
+                !auditDeletedKeys));
+        if (result != null) {
+          deletedObjects.addAll(result.getDeletedObjects());
+        }
       }
       if (!pathList.isEmpty()) {
+        // delete file paths only. This stops tombstones
+        // being added until the final directory cleanup
+        // (HADOOP-17244)
         metadataStore.deletePaths(pathList, state);
       }
-      if (auditDeletedKeys && result != null) {
+      if (auditDeletedKeys) {
         // audit the deleted keys
-        List<DeleteObjectsResult.DeletedObject> deletedObjects =
-            result.getDeletedObjects();
         if (deletedObjects.size() != keyList.size()) {
           // size mismatch
           LOG.warn("Size mismatch in deletion operation. "
@@ -549,7 +591,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
           for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
             keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
           }
-          for (DeleteObjectsRequest.KeyVersion kv : keyList) {
+          for (DeleteEntry kv : keyList) {
             LOG.debug("{}", kv.getKey());
           }
         }
@@ -557,5 +599,31 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
     }
   }
 
+  /**
+   * Deletion entry; dir marker state is tracked to control S3Guard
+   * update policy.
+   */
+  private static final class DeleteEntry {
+    private final DeleteObjectsRequest.KeyVersion keyVersion;
+
+    private final boolean isDirMarker;
+
+    private DeleteEntry(final String key, final boolean isDirMarker) {
+      this.keyVersion = new DeleteObjectsRequest.KeyVersion(key);
+      this.isDirMarker = isDirMarker;
+    }
+
+    public String getKey() {
+      return keyVersion.getKey();
+    }
+
+    @Override
+    public String toString() {
+      return "DeleteEntry{" +
+          "key='" + getKey() + '\'' +
+          ", isDirMarker=" + isDirMarker +
+          '}';
+    }
+  }
 
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteSupport.java
index ff6280d..0a83389 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteSupport.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteSupport.java
@@ -23,6 +23,7 @@ import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -37,8 +38,10 @@ import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.AWSS3IOException;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.Tristate;
 import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
 
 import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
 
@@ -84,15 +87,25 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
   public static IOException translateDeleteException(
       final String message,
       final MultiObjectDeleteException deleteException) {
+    List<MultiObjectDeleteException.DeleteError> errors
+        = deleteException.getErrors();
+    LOG.warn("Bulk delete operation failed to delete all objects;"
+            + " failure count = {}",
+        errors.size());
     final StringBuilder result = new StringBuilder(
-        deleteException.getErrors().size() * 256);
+        errors.size() * 256);
     result.append(message).append(": ");
     String exitCode = "";
     for (MultiObjectDeleteException.DeleteError error :
         deleteException.getErrors()) {
       String code = error.getCode();
-      result.append(String.format("%s: %s: %s%n", code, error.getKey(),
-          error.getMessage()));
+      String item = String.format("%s: %s%s: %s%n", code, error.getKey(),
+          (error.getVersionId() != null
+              ? (" (" + error.getVersionId() + ")")
+              : ""),
+          error.getMessage());
+      LOG.warn(item);
+      result.append(item);
       if (exitCode.isEmpty() || ACCESS_DENIED.equals(code)) {
         exitCode = code;
       }
@@ -113,7 +126,7 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
    * @param keysToDelete the keys in the delete request
    * @return tuple of (undeleted, deleted) paths.
    */
-  public Pair<List<Path>, List<Path>> splitUndeletedKeys(
+  public Pair<List<KeyPath>, List<KeyPath>> splitUndeletedKeys(
       final MultiObjectDeleteException deleteException,
       final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
     LOG.debug("Processing delete failure; keys to delete count = {};"
@@ -122,11 +135,11 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
         deleteException.getErrors().size(),
         deleteException.getDeletedObjects().size());
     // convert the collection of keys being deleted into paths
-    final List<Path> pathsBeingDeleted = keysToPaths(keysToDelete);
-    // Take this is list of paths
+    final List<KeyPath> pathsBeingDeleted = keysToKeyPaths(keysToDelete);
+    // Take this ist of paths
     // extract all undeleted entries contained in the exception and
-    // then removes them from the original list.
-    List<Path> undeleted = removeUndeletedPaths(deleteException,
+    // then remove them from the original list.
+    List<KeyPath> undeleted = removeUndeletedPaths(deleteException,
         pathsBeingDeleted,
         getStoreContext()::keyToPath);
     return Pair.of(undeleted, pathsBeingDeleted);
@@ -139,7 +152,17 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
    */
   public List<Path> keysToPaths(
       final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
-    return convertToPaths(keysToDelete,
+    return toPathList(keysToKeyPaths(keysToDelete));
+  }
+
+  /**
+   * Given a list of delete requests, convert them all to keypaths.
+   * @param keysToDelete list of keys for the delete operation.
+   * @return list of keypath entries
+   */
+  public List<KeyPath> keysToKeyPaths(
+      final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
+    return convertToKeyPaths(keysToDelete,
         getStoreContext()::keyToPath);
   }
 
@@ -149,13 +172,17 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
    * @param qualifier path qualifier
    * @return the paths.
    */
-  public static List<Path> convertToPaths(
+  public static List<KeyPath> convertToKeyPaths(
       final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete,
       final Function<String, Path> qualifier) {
-    return keysToDelete.stream()
-        .map((keyVersion) ->
-          qualifier.apply(keyVersion.getKey()))
-        .collect(Collectors.toList());
+    List<KeyPath> l = new ArrayList<>(keysToDelete.size());
+    for (DeleteObjectsRequest.KeyVersion kv : keysToDelete) {
+      String key = kv.getKey();
+      Path p = qualifier.apply(key);
+      boolean isDir = key.endsWith("/");
+      l.add(new KeyPath(key, p, isDir));
+    }
+    return l;
   }
 
   /**
@@ -164,27 +191,59 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
    * and the original list of files to delete declares to have been deleted.
    * @param deleteException the delete exception.
    * @param keysToDelete collection of keys which had been requested.
+   * @param retainedMarkers list built up of retained markers.
    * @return a tuple of (undeleted, deleted, failures)
    */
   public Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
       processDeleteFailure(
       final MultiObjectDeleteException deleteException,
-      final List<DeleteObjectsRequest.KeyVersion> keysToDelete) {
+      final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+      final List<Path> retainedMarkers) {
     final MetadataStore metadataStore =
         checkNotNull(getStoreContext().getMetadataStore(),
             "context metadatastore");
     final List<Pair<Path, IOException>> failures = new ArrayList<>();
-    final Pair<List<Path>, List<Path>> outcome =
+    final Pair<List<KeyPath>, List<KeyPath>> outcome =
         splitUndeletedKeys(deleteException, keysToDelete);
-    List<Path> deleted = outcome.getRight();
-    List<Path> undeleted = outcome.getLeft();
-    // delete the paths but recover
-    // TODO: handle the case where a parent path is deleted but not a child.
-    // TODO: in a fake object delete, we don't actually want to delete
-    //  metastore entries
-    deleted.forEach(path -> {
-      try {
-        metadataStore.delete(path, operationState);
+    List<KeyPath> deleted = outcome.getRight();
+    List<Path> deletedPaths = new ArrayList<>();
+    List<KeyPath> undeleted = outcome.getLeft();
+    retainedMarkers.clear();
+    List<Path> undeletedPaths = toPathList((List<KeyPath>) undeleted);
+    // sort shorter keys first,
+    // so that if the left key is longer than the first it is considered
+    // smaller, so appears in the list first.
+    // thus when we look for a dir being empty, we know it holds
+    deleted.sort((l, r) -> r.getKey().length() - l.getKey().length());
+
+    // now go through and delete from S3Guard all paths listed in
+    // the result which are either files or directories with
+    // no children.
+    deleted.forEach(kp -> {
+      Path path = kp.getPath();
+      try{
+        boolean toDelete = true;
+        if (kp.isDirectoryMarker()) {
+          // its a dir marker, which could be an empty dir
+          // (which is then tombstoned), or a non-empty dir, which
+          // is not tombstoned.
+          // for this to be handled, we have to have removed children
+          // from the store first, which relies on the sort
+          PathMetadata pmentry = metadataStore.get(path, true);
+          if (pmentry != null && !pmentry.isDeleted()) {
+            toDelete = pmentry.getFileStatus().isEmptyDirectory()
+                == Tristate.TRUE;
+          } else {
+            toDelete = false;
+          }
+        }
+        if (toDelete) {
+          LOG.debug("Removing deleted object from S3Guard Store {}", path);
+          metadataStore.delete(path, operationState);
+        } else {
+          LOG.debug("Retaining S3Guard directory entry {}", path);
+          retainedMarkers.add(path);
+        }
       } catch (IOException e) {
         // trouble: we failed to delete the far end entry
         // try with the next one.
@@ -192,11 +251,25 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
         LOG.warn("Failed to update S3Guard store with deletion of {}", path);
         failures.add(Pair.of(path, e));
       }
+      // irrespective of the S3Guard outcome, it is declared as deleted, as
+      // it is no longer in the S3 store.
+      deletedPaths.add(path);
     });
     if (LOG.isDebugEnabled()) {
       undeleted.forEach(p -> LOG.debug("Deleted {}", p));
     }
-    return Triple.of(undeleted, deleted, failures);
+    return Triple.of(undeletedPaths, deletedPaths, failures);
+  }
+
+  /**
+   * Given a list of keypaths, convert to a list of paths.
+   * @param keyPaths source list
+   * @return a listg of paths
+   */
+  public static List<Path> toPathList(final List<KeyPath> keyPaths) {
+    return keyPaths.stream()
+        .map(KeyPath::getPath)
+        .collect(Collectors.toList());
   }
 
   /**
@@ -211,8 +284,31 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
   public static List<Path> extractUndeletedPaths(
       final MultiObjectDeleteException deleteException,
       final Function<String, Path> qualifierFn) {
-    return deleteException.getErrors().stream()
-        .map((e) -> qualifierFn.apply(e.getKey()))
+    return toPathList(extractUndeletedKeyPaths(deleteException, qualifierFn));
+  }
+
+  /**
+   * Build a list of undeleted paths from a {@code MultiObjectDeleteException}.
+   * Outside of unit tests, the qualifier function should be
+   * {@link S3AFileSystem#keyToQualifiedPath(String)}.
+   * @param deleteException the delete exception.
+   * @param qualifierFn function to qualify paths
+   * @return the possibly empty list of paths.
+   */
+  @VisibleForTesting
+  public static List<KeyPath> extractUndeletedKeyPaths(
+      final MultiObjectDeleteException deleteException,
+      final Function<String, Path> qualifierFn) {
+
+    List<MultiObjectDeleteException.DeleteError> errors
+        = deleteException.getErrors();
+    return errors.stream()
+        .map((error) -> {
+          String key = error.getKey();
+          Path path = qualifierFn.apply(key);
+          boolean isDir = key.endsWith("/");
+          return new KeyPath(key, path, isDir);
+        })
         .collect(Collectors.toList());
   }
 
@@ -227,12 +323,17 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
    * @return the list of undeleted entries
    */
   @VisibleForTesting
-  static List<Path> removeUndeletedPaths(
+  static List<KeyPath> removeUndeletedPaths(
       final MultiObjectDeleteException deleteException,
-      final Collection<Path> pathsBeingDeleted,
+      final Collection<KeyPath> pathsBeingDeleted,
       final Function<String, Path> qualifier) {
-    List<Path> undeleted = extractUndeletedPaths(deleteException, qualifier);
-    pathsBeingDeleted.removeAll(undeleted);
+    // get the undeleted values
+    List<KeyPath> undeleted = extractUndeletedKeyPaths(deleteException,
+        qualifier);
+    // and remove them from the undeleted list, matching on key
+    for (KeyPath undel : undeleted) {
+      pathsBeingDeleted.removeIf(kp -> kp.getPath().equals(undel.getPath()));
+    }
     return undeleted;
   }
 
@@ -247,4 +348,70 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
       final List<DeleteObjectsRequest.KeyVersion> keysToDelete) {
     return keysToPaths(keysToDelete);
   }
+
+  /**
+   * Representation of a (key, path) which couldn't be deleted;
+   * the dir marker flag is inferred from the key suffix.
+   * <p>
+   * Added because Pairs of Lists of Triples was just too complex
+   * for Java code.
+   * </p>
+   */
+  public static final class KeyPath {
+    /** Key in bucket. */
+    private final String key;
+    /** Full path. */
+    private final Path path;
+    /** Is this a directory marker? */
+    private final boolean directoryMarker;
+
+    public KeyPath(final String key,
+        final Path path,
+        final boolean directoryMarker) {
+      this.key = key;
+      this.path = path;
+      this.directoryMarker = directoryMarker;
+    }
+
+    public String getKey() {
+      return key;
+    }
+
+    public Path getPath() {
+      return path;
+    }
+
+    public boolean isDirectoryMarker() {
+      return directoryMarker;
+    }
+
+    @Override
+    public String toString() {
+      return "KeyPath{" +
+          "key='" + key + '\'' +
+          ", path=" + path +
+          ", directoryMarker=" + directoryMarker +
+          '}';
+    }
+
+    /**
+     * Equals test is on key alone.
+     */
+    @Override
+    public boolean equals(final Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      KeyPath keyPath = (KeyPath) o;
+      return key.equals(keyPath.key);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(key);
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
index 0fcf645..3391097 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
@@ -105,7 +105,7 @@ public interface OperationCallbacks {
       throws IOException;
 
   /**
-   * Recursive list of files and empty directories.
+   * Recursive list of files and directory markers.
    *
    * @param path path to list from
    * @param status optional status of path to list.
@@ -115,7 +115,7 @@ public interface OperationCallbacks {
    * @throws IOException failure
    */
   @Retries.RetryTranslated
-  RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
+  RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
       Path path,
       S3AFileStatus status,
       boolean collectTombstones,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
index bd277b7..5890ac0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
@@ -211,14 +211,23 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
    *     Only queuing objects here whose copy operation has
    *     been submitted and so is in that thread pool.
    *   </li>
+   *   <li>
+   *     If a path is supplied, then after the delete is executed
+   *     (and completes) the rename tracker from S3Guard will be
+   *     told of its deletion. Do not set this for directory
+   *     markers with children, as it may mistakenly add
+   *     tombstones into the table.
+   *   </li>
    * </ol>
    * This method must only be called from the primary thread.
-   * @param path path to the object
+   * @param path path to the object.
    * @param key key of the object.
    */
   private void queueToDelete(Path path, String key) {
     LOG.debug("Queueing to delete {}", path);
-    pathsToDelete.add(path);
+    if (path != null) {
+      pathsToDelete.add(path);
+    }
     keysToDelete.add(new DeleteObjectsRequest.KeyVersion(key));
   }
 
@@ -234,7 +243,9 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
    */
   private void queueToDelete(
       List<DirMarkerTracker.Marker> markersToDelete) {
-    markersToDelete.forEach(this::queueToDelete);
+    markersToDelete.forEach(m -> queueToDelete(
+        null,
+        m.getKey()));
   }
 
   /**
@@ -397,6 +408,7 @@ Are   * @throws IOException failure
           destStatus.getPath());
       // Although the dir marker policy doesn't always need to do this,
       // it's simplest just to be consistent here.
+      // note: updates the metastore as well a S3.
       callbacks.deleteObjectAtPath(destStatus.getPath(), dstKey, false, null);
     }
 
@@ -408,7 +420,7 @@ Are   * @throws IOException failure
         false);
 
     final RemoteIterator<S3ALocatedFileStatus> iterator =
-        callbacks.listFilesAndEmptyDirectories(parentPath,
+        callbacks.listFilesAndDirectoryMarkers(parentPath,
             sourceStatus,
             true,
             true);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 33d0124..8ec8488 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -717,7 +717,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
   public DDBPathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
       throws IOException {
     checkPath(path);
-    LOG.debug("Get from table {} in region {}: {}. wantEmptyDirectory={}",
+    LOG.debug("Get from table {} in region {}: {} ; wantEmptyDirectory={}",
         tableName, region, path, wantEmptyDirectoryFlag);
     DDBPathMetadata result = innerGet(path, wantEmptyDirectoryFlag);
     LOG.debug("result of get {} is: {}", path, result);
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/directory_markers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/directory_markers.md
index 53030d6..65fcb65 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/directory_markers.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/directory_markers.md
@@ -12,7 +12,11 @@
   limitations under the License. See accompanying LICENSE file.
 -->
 
-# Controlling the S3A Directory Marker Behavior
+# Experimental: Controlling the S3A Directory Marker Behavior
+
+This document discusses an experimental feature of the S3A
+connector since Hadoop 3.3.1: the ability to retain directory
+marker objects above paths containing files or subdirectories.
 
 ## <a name="compatibility"></a> Critical: this is not backwards compatible!
 
@@ -26,15 +30,40 @@ Versions of Hadoop which are incompatible with other marker retention policies,
 as of August 2020.
 
 -------------------------------------------------------
-|  Branch    | Compatible Since | Future Fix Planned? |
+|  Branch    | Compatible Since | Supported           |
 |------------|------------------|---------------------|
-| Hadoop 2.x |                  | NO                  |
-| Hadoop 3.0 |                  | NO                  |
-| Hadoop 3.1 |      check       | Yes                 |
-| Hadoop 3.2 |      check       | Yes                 |
+| Hadoop 2.x |       n/a        | WONTFIX             |
+| Hadoop 3.0 |      check       | Read-only           |
+| Hadoop 3.1 |      check       | Read-only           |
+| Hadoop 3.2 |      check       | Read-only           |
 | Hadoop 3.3 |      3.3.1       | Done                |
 -------------------------------------------------------
 
+*WONTFIX*
+
+The Hadoop branch-2 line will *not* be patched.
+
+*Read-only*
+
+These branches have read-only compatibility.
+
+* They may list directories with directory markers, and correctly identify when
+  such directories have child entries.
+* They will open files under directories with such markers.
+
+However, they have limitations when writing/deleting directories.
+
+Specifically: S3Guard tables may not be correctly updated in
+all conditions, especially on the partial failure of delete
+operations. Specifically: they may mistakenly add a tombstone in
+the dynamoDB table and so future directory/directory tree listings
+will consider the directory to be nonexistent.
+
+_It is not safe for Hadoop releases before Hadoop 3.3.1 to write
+to S3 buckets which have directory markers when S3Guard is enabled_
+
+## Verifying read compatibility.
+
 The `s3guard bucket-info` tool [can be used to verify support](#bucket-info).
 This allows for a command line check of compatibility, including
 in scripts.
@@ -49,6 +78,7 @@ It is only safe change the directory marker policy if the following
    (including backing up) an S3 bucket.
 2. You know all applications which read data from the bucket are compatible.
 
+
 ### <a name="backups"></a> Applications backing up data.
 
 It is not enough to have a version of Apache Hadoop which is compatible, any
@@ -240,7 +270,7 @@ can switch to the higher-performance mode for those specific directories.
 Only the default setting, `fs.s3a.directory.marker.retention = delete` is compatible with
 every shipping Hadoop releases.
 
-##  <a name="authoritative"></a> Directory Markers and S3Guard
+##  <a name="s3guard"></a> Directory Markers and S3Guard
 
 Applications which interact with S3A in S3A clients with S3Guard enabled still
 create and delete markers. There's no attempt to skip operations, such as by having
@@ -256,6 +286,28 @@ then an S3A connector with a retention policy of `fs.s3a.directory.marker.retent
 only use in managed applications where all clients are using the same version of
 hadoop, and configured consistently.
 
+After the directory marker feature [HADOOP-13230](https://issues.apache.org/jira/browse/HADOOP-13230)
+was added, issues related to S3Guard integration surfaced:
+
+1. The incremental update of the S3Guard table was inserting tombstones
+   over directories as the markers were deleted, hiding files underneath.
+   This happened during directory `rename()` and `delete()`.
+1. The update of the S3Guard table after a partial failure of a bulk delete
+   operation would insert tombstones in S3Guard records of successfully
+   deleted markers, irrespective of the directory status.
+
+Issue #1 is unique to Hadoop branch 3.3; however issue #2 is s critical
+part of the S3Guard consistency handling.
+
+Both issues have been fixed in Hadoop 3.3.x,
+in [HADOOP-17244](https://issues.apache.org/jira/browse/HADOOP-17244)
+
+Issue #2, delete failure handling, is not easily backported and is
+not likely to be backported.
+
+Accordingly: Hadoop releases with read-only compatibility must not be used
+to rename or delete directories where markers are retained *when S3Guard is enabled.*
+
 ## <a name="bucket-info"></a> Verifying marker policy with `s3guard bucket-info`
 
 The `bucket-info` command has been enhanced to support verification from the command
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
index 9253ee3..c0feb85 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.MultiObjectDeleteException;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
 import org.junit.Assume;
 
 import org.apache.hadoop.conf.Configuration;
@@ -141,13 +142,14 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
     Path markerPath = fs.keyToQualifiedPath(marker);
     keys.add(new DeleteObjectsRequest.KeyVersion(marker));
 
-    Pair<List<Path>, List<Path>> pair =
+    Pair<List<KeyPath>, List<KeyPath>> pair =
         new MultiObjectDeleteSupport(fs.createStoreContext(), null)
         .splitUndeletedKeys(ex, keys);
-    assertEquals(undeleted, pair.getLeft());
-    List<Path> right = pair.getRight();
-    assertEquals("Wrong size for " + join(right), 1, right.size());
-    assertEquals(markerPath, right.get(0));
+    assertEquals(undeleted, toPathList(pair.getLeft()));
+    List<KeyPath> right = pair.getRight();
+    Assertions.assertThat(right)
+        .hasSize(1);
+    assertEquals(markerPath, right.get(0).getPath());
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
index db3c2b6..41110b9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
@@ -32,6 +32,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
@@ -286,4 +287,27 @@ public class ITestS3GuardEmptyDirs extends AbstractS3ATestBase {
     s3.putObject(putObjectRequest);
   }
 
+  @Test
+  public void testDirMarkerDelete() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    assumeFilesystemHasMetadatastore(getFileSystem());
+    Path baseDir = methodPath();
+    Path subFile = new Path(baseDir, "subdir/file.txt");
+    // adds the s3guard entry
+    fs.mkdirs(baseDir);
+    touch(fs, subFile);
+    // PUT a marker
+    createEmptyObject(fs, fs.pathToKey(baseDir) + "/");
+    fs.delete(baseDir, true);
+    assertPathDoesNotExist("Should have been deleted", baseDir);
+
+    // now create the dir again
+    fs.mkdirs(baseDir);
+    FileStatus fileStatus = fs.getFileStatus(baseDir);
+    Assertions.assertThat(fileStatus)
+        .matches(FileStatus::isDirectory, "Not a directory");
+    Assertions.assertThat(fs.listStatus(baseDir))
+        .describedAs("listing of %s", baseDir)
+        .isEmpty();
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 103d179..4423060 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -37,19 +37,13 @@ import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
-import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
-import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
-import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
-import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
-import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities;
-import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
-import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
+import org.apache.hadoop.fs.s3a.test.OperationTrackingStore;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -61,13 +55,7 @@ import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
-import com.amazonaws.services.s3.model.MultiObjectDeleteException;
-import com.amazonaws.services.s3.transfer.model.CopyResult;
-import javax.annotation.Nullable;
 import org.hamcrest.core.Is;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -82,8 +70,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -1024,8 +1010,14 @@ public final class S3ATestUtils {
      * @param expected expected value.
      */
     public void assertDiffEquals(String message, long expected) {
-      Assert.assertEquals(message + ": " + statistic.getSymbol(),
-          expected, diff());
+      String text = message + ": " + statistic.getSymbol();
+      long diff = diff();
+      if (expected != diff) {
+        // Log in error ensures that the details appear in the test output
+        LOG.error(text + " expected {}, actual {}", expected, diff);
+      }
+      Assert.assertEquals(text,
+          expected, diff);
     }
 
     /**
@@ -1540,292 +1532,4 @@ public final class S3ATestUtils {
         probes);
   }
 
-  public static class MinimalOperationCallbacks
-          implements OperationCallbacks {
-    @Override
-    public S3ObjectAttributes createObjectAttributes(
-            Path path,
-            String eTag,
-            String versionId,
-            long len) {
-      return null;
-    }
-
-    @Override
-    public S3ObjectAttributes createObjectAttributes(
-            S3AFileStatus fileStatus) {
-      return null;
-    }
-
-    @Override
-    public S3AReadOpContext createReadContext(
-            FileStatus fileStatus) {
-      return null;
-    }
-
-    @Override
-    public void finishRename(
-            Path sourceRenamed,
-            Path destCreated)
-            throws IOException {
-
-    }
-
-    @Override
-    public void deleteObjectAtPath(
-            Path path,
-            String key,
-            boolean isFile,
-            BulkOperationState operationState)
-            throws IOException {
-
-    }
-
-    @Override
-    public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
-            Path path,
-            S3AFileStatus status,
-            boolean collectTombstones,
-            boolean includeSelf)
-            throws IOException {
-      return null;
-    }
-
-    @Override
-    public CopyResult copyFile(
-            String srcKey,
-            String destKey,
-            S3ObjectAttributes srcAttributes,
-            S3AReadOpContext readContext)
-            throws IOException {
-      return null;
-    }
-
-    @Override
-    public DeleteObjectsResult removeKeys(
-            List<DeleteObjectsRequest.KeyVersion> keysToDelete,
-            boolean deleteFakeDir,
-            List<Path> undeletedObjectsOnFailure,
-            BulkOperationState operationState,
-            boolean quiet)
-            throws MultiObjectDeleteException, AmazonClientException,
-            IOException {
-      return null;
-    }
-
-    @Override
-    public boolean allowAuthoritative(Path p) {
-      return false;
-    }
-
-    @Override
-    public RemoteIterator<S3AFileStatus> listObjects(
-            Path path,
-            String key)
-            throws IOException {
-      return null;
-    }
-  }
-
-  /**
-   * MetadataStore which tracks what is deleted and added.
-   */
-  public static class OperationTrackingStore implements MetadataStore {
-
-    private final List<Path> deleted = new ArrayList<>();
-
-    private final List<Path> created = new ArrayList<>();
-
-    @Override
-    public void initialize(final FileSystem fs,
-        ITtlTimeProvider ttlTimeProvider) {
-    }
-
-    @Override
-    public void initialize(final Configuration conf,
-        ITtlTimeProvider ttlTimeProvider) {
-    }
-
-    @Override
-    public void forgetMetadata(final Path path) {
-    }
-
-    @Override
-    public PathMetadata get(final Path path) {
-      return null;
-    }
-
-    @Override
-    public PathMetadata get(final Path path,
-        final boolean wantEmptyDirectoryFlag) {
-      return null;
-    }
-
-    @Override
-    public DirListingMetadata listChildren(final Path path) {
-      return null;
-    }
-
-    @Override
-    public void put(final PathMetadata meta) {
-      put(meta, null);
-    }
-
-    @Override
-    public void put(final PathMetadata meta,
-        final BulkOperationState operationState) {
-      created.add(meta.getFileStatus().getPath());
-    }
-
-    @Override
-    public void put(final Collection<? extends PathMetadata> metas,
-        final BulkOperationState operationState) {
-      metas.stream().forEach(meta -> put(meta, null));
-    }
-
-    @Override
-    public void put(final DirListingMetadata meta,
-        final List<Path> unchangedEntries,
-        final BulkOperationState operationState) {
-      created.add(meta.getPath());
-    }
-
-    @Override
-    public void destroy() {
-    }
-
-    @Override
-    public void delete(final Path path,
-        final BulkOperationState operationState) {
-      deleted.add(path);
-    }
-
-    @Override
-    public void deletePaths(final Collection<Path> paths,
-        @Nullable final BulkOperationState operationState)
-            throws IOException {
-      deleted.addAll(paths);
-    }
-
-    @Override
-    public void deleteSubtree(final Path path,
-        final BulkOperationState operationState) {
-
-    }
-
-    @Override
-    public void move(@Nullable final Collection<Path> pathsToDelete,
-        @Nullable final Collection<PathMetadata> pathsToCreate,
-        @Nullable final BulkOperationState operationState) {
-    }
-
-    @Override
-    public void prune(final PruneMode pruneMode, final long cutoff) {
-    }
-
-    @Override
-    public long prune(final PruneMode pruneMode,
-        final long cutoff,
-        final String keyPrefix) {
-      return 0;
-    }
-
-    @Override
-    public BulkOperationState initiateBulkWrite(
-        final BulkOperationState.OperationType operation,
-        final Path dest) {
-      return null;
-    }
-
-    @Override
-    public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
-    }
-
-    @Override
-    public Map<String, String> getDiagnostics() {
-      return null;
-    }
-
-    @Override
-    public void updateParameters(final Map<String, String> parameters) {
-    }
-
-    @Override
-    public void close() {
-    }
-
-    public List<Path> getDeleted() {
-      return deleted;
-    }
-
-    public List<Path> getCreated() {
-      return created;
-    }
-
-    @Override
-    public RenameTracker initiateRenameOperation(
-        final StoreContext storeContext,
-        final Path source,
-        final S3AFileStatus sourceStatus,
-        final Path dest) {
-      throw new UnsupportedOperationException("unsupported");
-    }
-
-    @Override
-    public void addAncestors(final Path qualifiedPath,
-        @Nullable final BulkOperationState operationState) {
-
-    }
-  }
-
-  public static class MinimalListingOperationCallbacks
-          implements ListingOperationCallbacks {
-    @Override
-    public CompletableFuture<S3ListResult> listObjectsAsync(
-            S3ListRequest request)
-            throws IOException {
-      return null;
-    }
-
-    @Override
-    public CompletableFuture<S3ListResult> continueListObjectsAsync(
-            S3ListRequest request,
-            S3ListResult prevResult)
-            throws IOException {
-      return null;
-    }
-
-    @Override
-    public S3ALocatedFileStatus toLocatedFileStatus(
-            S3AFileStatus status) throws IOException {
-      return null;
-    }
-
-    @Override
-    public S3ListRequest createListObjectsRequest(
-            String key,
-            String delimiter) {
-      return null;
-    }
-
-    @Override
-    public long getDefaultBlockSize(Path path) {
-      return 0;
-    }
-
-    @Override
-    public int getMaxKeys() {
-      return 0;
-    }
-
-    @Override
-    public ITtlTimeProvider getUpdatedTtlTimeProvider() {
-      return null;
-    }
-
-    @Override
-    public boolean allowAuthoritative(Path p) {
-      return false;
-    }
-  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
index 8c8ca37..d6c0b1d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -58,6 +57,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.MetricDiff;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
 import static org.apache.hadoop.fs.s3a.Statistic.FILES_DELETE_REJECTED;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_OBJECTS;
 import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_REQUESTS;
 import static org.apache.hadoop.fs.s3a.auth.RoleModel.Effects;
 import static org.apache.hadoop.fs.s3a.auth.RoleModel.Statement;
@@ -72,6 +72,7 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
 import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.extractUndeletedPaths;
 import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
+import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList;
 import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
 import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@@ -331,27 +332,37 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
     removeBucketOverrides(bucketName, conf,
         MAX_THREADS,
         MAXIMUM_CONNECTIONS,
-        S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY);
+        S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
+        DIRECTORY_MARKER_POLICY,
+        BULK_DELETE_PAGE_SIZE);
     conf.setInt(MAX_THREADS, EXECUTOR_THREAD_COUNT);
     conf.setInt(MAXIMUM_CONNECTIONS, EXECUTOR_THREAD_COUNT * 2);
     // turn off prune delays, so as to stop scale tests creating
     // so much cruft that future CLI prune commands take forever
     conf.setInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, 0);
+    // use the keep policy to ensure that surplus markers exist
+    // to complicate failures
+    conf.set(DIRECTORY_MARKER_POLICY, DIRECTORY_MARKER_POLICY_KEEP);
+    // set the delete page size to its maximum to ensure that all
+    // entries are included in the same large delete, even on
+    // scale runs. This is needed for assertions on the result.
+    conf.setInt(BULK_DELETE_PAGE_SIZE, 1_000);
     return conf;
   }
 
   /**
    * Create a unique path, which includes method name,
-   * multidelete flag and a random UUID.
+   * multidelete flag and a timestamp.
    * @return a string to use for paths.
    * @throws IOException path creation failure.
    */
   private Path uniquePath() throws IOException {
+    long now = System.currentTimeMillis();
     return path(
-        String.format("%s-%s-%04d",
+        String.format("%s-%s-%06d.%03d",
             getMethodName(),
             multiDelete ? "multi" : "single",
-            System.currentTimeMillis() % 10000));
+            now / 1000, now % 1000));
   }
 
   /**
@@ -477,8 +488,11 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
 
     // create a set of files
     // this is done in parallel as it is 10x faster on a long-haul test run.
-    List<Path> createdFiles = createFiles(fs, readOnlyDir, dirDepth, fileCount,
-        dirCount);
+    List<Path> dirs = new ArrayList<>(dirCount);
+    List<Path> createdFiles = createDirsAndFiles(fs, readOnlyDir, dirDepth,
+        fileCount, dirCount,
+        new ArrayList<>(fileCount),
+        dirs);
     // are they all there?
     int expectedFileCount = createdFiles.size();
     assertFileCount("files ready to rename", roleFS,
@@ -495,26 +509,36 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
           MultiObjectDeleteException.class, deniedException);
       final List<Path> undeleted
           = extractUndeletedPaths(mde, fs::keyToQualifiedPath);
+
+      List<Path> expectedUndeletedFiles = new ArrayList<>(createdFiles);
+      if (getFileSystem().getDirectoryMarkerPolicy()
+          .keepDirectoryMarkers(readOnlyDir)) {
+        // directory markers are being retained,
+        // so will also be in the list of undeleted files
+        expectedUndeletedFiles.addAll(dirs);
+      }
       Assertions.assertThat(undeleted)
           .as("files which could not be deleted")
-          .hasSize(expectedFileCount)
-          .containsAll(createdFiles)
-          .containsExactlyInAnyOrderElementsOf(createdFiles);
+          .containsExactlyInAnyOrderElementsOf(expectedUndeletedFiles);
     }
     LOG.info("Result of renaming read-only files is as expected",
         deniedException);
     assertFileCount("files in the source directory", roleFS,
         readOnlyDir, expectedFileCount);
     // now lets look at the destination.
-    // even with S3Guard on, we expect the destination to match that of our
+    // even with S3Guard on, we expect the destination to match that of
     // the remote state.
     // the test will exist
     describe("Verify destination directory exists");
-    FileStatus st = roleFS.getFileStatus(writableDir);
-    assertTrue("Not a directory: " + st,
-        st.isDirectory());
+    assertIsDirectory(writableDir);
     assertFileCount("files in the dest directory", roleFS,
         writableDir, expectedFileCount);
+    // all directories in the source tree must still exist,
+    // which for S3Guard means no tombstone markers were added
+    LOG.info("Verifying all directories still exist");
+    for (Path dir : dirs) {
+      assertIsDirectory(dir);
+    }
   }
 
   @Test
@@ -611,9 +635,14 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
 
     // the full FS
     S3AFileSystem fs = getFileSystem();
-
-    List<Path> readOnlyFiles = createFiles(fs, readOnlyDir,
-        dirDepth, fileCount, dirCount);
+    StoreContext storeContext = fs.createStoreContext();
+
+    List<Path> dirs = new ArrayList<>(dirCount);
+    List<Path> readOnlyFiles = createDirsAndFiles(
+        fs, readOnlyDir, dirDepth,
+        fileCount, dirCount,
+        new ArrayList<>(fileCount),
+        dirs);
     List<Path> deletableFiles = createFiles(fs,
         writableDir, dirDepth, fileCount, dirCount);
 
@@ -625,20 +654,31 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
         readOnlyFiles.stream(),
         deletableFiles.stream())
         .collect(Collectors.toList());
+    List<MultiObjectDeleteSupport.KeyPath> keyPaths = allFiles.stream()
+        .map(path ->
+            new MultiObjectDeleteSupport.KeyPath(
+                storeContext.pathToKey(path),
+                path,
+                false))
+        .collect(Collectors.toList());
 
     // this set can be deleted by the role FS
     MetricDiff rejectionCount = new MetricDiff(roleFS, FILES_DELETE_REJECTED);
     MetricDiff deleteVerbCount = new MetricDiff(roleFS, OBJECT_DELETE_REQUESTS);
+    MetricDiff deleteObjectCount = new MetricDiff(roleFS,
+        OBJECT_DELETE_OBJECTS);
 
     describe("Trying to delete read only directory");
     AccessDeniedException ex = expectDeleteForbidden(readOnlyDir);
     if (multiDelete) {
       // multi-delete status checks
       extractCause(MultiObjectDeleteException.class, ex);
+      deleteVerbCount.assertDiffEquals("Wrong delete request count", 1);
+      deleteObjectCount.assertDiffEquals("Number of keys in delete request",
+          readOnlyFiles.size());
       rejectionCount.assertDiffEquals("Wrong rejection count",
           readOnlyFiles.size());
-      deleteVerbCount.assertDiffEquals("Wrong delete count", 1);
-      reset(rejectionCount, deleteVerbCount);
+      reset(rejectionCount, deleteVerbCount, deleteObjectCount);
     }
     // all the files are still there? (avoid in scale test due to cost)
     if (!scaleTest) {
@@ -649,16 +689,20 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
     ex = expectDeleteForbidden(basePath);
     if (multiDelete) {
       // multi-delete status checks
-      extractCause(MultiObjectDeleteException.class, ex);
       deleteVerbCount.assertDiffEquals("Wrong delete count", 1);
       MultiObjectDeleteException mde = extractCause(
           MultiObjectDeleteException.class, ex);
-      final List<Path> undeleted
-          = removeUndeletedPaths(mde, allFiles, fs::keyToQualifiedPath);
+      List<MultiObjectDeleteSupport.KeyPath> undeletedKeyPaths =
+          removeUndeletedPaths(mde, keyPaths, storeContext::keyToPath);
+      final List<Path> undeleted = toPathList(
+          undeletedKeyPaths);
+      deleteObjectCount.assertDiffEquals(
+          "Wrong count of objects in delete request",
+          allFiles.size());
       Assertions.assertThat(undeleted)
           .as("files which could not be deleted")
           .containsExactlyInAnyOrderElementsOf(readOnlyFiles);
-      Assertions.assertThat(allFiles)
+      Assertions.assertThat(toPathList(keyPaths))
           .as("files which were deleted")
           .containsExactlyInAnyOrderElementsOf(deletableFiles);
       rejectionCount.assertDiffEquals("Wrong rejection count",
@@ -677,7 +721,26 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
 
     Assertions.assertThat(readOnlyListing)
         .as("ReadOnly directory " + directoryList)
-        .containsAll(readOnlyFiles);
+        .containsExactlyInAnyOrderElementsOf(readOnlyFiles);
+  }
+
+  /**
+   * Verifies the logic of handling directory markers in
+   * delete operations, specifically:
+   * <ol>
+   *   <li>all markers above empty directories MUST be deleted</li>
+   *   <li>all markers above non-empty directories MUST NOT be deleted</li>
+   * </ol>
+   * As the delete list may include subdirectories, we need to work up from
+   * the bottom of the list of deleted files before probing the parents,
+   * that being done by a s3guard get(path, need-empty-directory) call.
+   * <p></p>
+   * This is pretty sensitive code.
+   */
+  @Test
+  public void testSubdirDeleteFailures() throws Throwable {
+    describe("Multiobject delete handling of directorYesFory markers");
+    assume("Multiobject delete only", multiDelete);
   }
 
   /**
@@ -771,7 +834,7 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
   }
 
   /**
-   * Parallel-touch a set of files in the destination directory.
+   * Build a set of files in a directory tree.
    * @param fs filesystem
    * @param destDir destination
    * @param depth file depth
@@ -784,12 +847,48 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
       final int depth,
       final int fileCount,
       final int dirCount) throws IOException {
-    List<CompletableFuture<Path>> futures = new ArrayList<>(fileCount);
-    List<Path> paths = new ArrayList<>(fileCount);
-    List<Path> dirs = new ArrayList<>(fileCount);
+    return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
+        new ArrayList<Path>(fileCount),
+        new ArrayList<Path>(dirCount));
+  }
+
+  /**
+   * Build a set of files in a directory tree.
+   * @param fs filesystem
+   * @param destDir destination
+   * @param depth file depth
+   * @param fileCount number of files to create.
+   * @param dirCount number of dirs to create at each level
+   * @param paths [out] list of file paths created
+   * @param dirs [out] list of directory paths created.
+   * @return the list of files created.
+   */
+  public static List<Path> createDirsAndFiles(final FileSystem fs,
+      final Path destDir,
+      final int depth,
+      final int fileCount,
+      final int dirCount,
+      final List<Path> paths,
+      final List<Path> dirs) throws IOException {
     buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
+    List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+        + dirs.size());
+
+    // create directories. With dir marker retention, that adds more entries
+    // to cause deletion issues
+    try (DurationInfo ignore =
+             new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
+      for (Path path : dirs) {
+        futures.add(submit(EXECUTOR, () ->{
+          fs.mkdirs(path);
+          return path;
+        }));
+      }
+      waitForCompletion(futures);
+    }
+
     try (DurationInfo ignore =
-            new DurationInfo(LOG, "Creating %d files", fileCount)) {
+            new DurationInfo(LOG, "Creating %d files", paths.size())) {
       for (Path path : paths) {
         futures.add(put(fs, path, path.getName()));
       }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
index a282b87..a2e7031 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.impl;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -36,9 +37,11 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.test.OperationTrackingStore;
 
 import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.ACCESS_DENIED;
 import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
+import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -56,36 +59,42 @@ public class TestPartialDeleteFailures {
     return new Path("s3a://bucket/" + k);
   }
 
+  private static String toKey(Path path) {
+    return path.toUri().getPath();
+  }
+
   @Before
   public void setUp() throws Exception {
     context = S3ATestUtils.createMockStoreContext(true,
-        new S3ATestUtils.OperationTrackingStore(), CONTEXT_ACCESSORS);
+        new OperationTrackingStore(), CONTEXT_ACCESSORS);
   }
 
   @Test
   public void testDeleteExtraction() {
-    List<Path> src = pathList("a", "a/b", "a/c");
-    List<Path> rejected = pathList("a/b");
+    List<MultiObjectDeleteSupport.KeyPath> src = pathList("a", "a/b", "a/c");
+    List<MultiObjectDeleteSupport.KeyPath> rejected = pathList("a/b");
     MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
         rejected);
-    List<Path> undeleted = removeUndeletedPaths(ex, src,
-        TestPartialDeleteFailures::qualifyKey);
+    List<MultiObjectDeleteSupport.KeyPath> undeleted =
+        removeUndeletedPaths(ex, src,
+            TestPartialDeleteFailures::qualifyKey);
     assertEquals("mismatch of rejected and undeleted entries",
         rejected, undeleted);
   }
 
   @Test
   public void testSplitKeysFromResults() throws Throwable {
-    List<Path> src = pathList("a", "a/b", "a/c");
-    List<Path> rejected = pathList("a/b");
-    List<DeleteObjectsRequest.KeyVersion> keys = keysToDelete(src);
+    List<MultiObjectDeleteSupport.KeyPath> src = pathList("a", "a/b", "a/c");
+    List<MultiObjectDeleteSupport.KeyPath> rejected = pathList("a/b");
+    List<DeleteObjectsRequest.KeyVersion> keys = keysToDelete(toPathList(src));
     MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
         rejected);
-    Pair<List<Path>, List<Path>> pair =
+    Pair<List<MultiObjectDeleteSupport.KeyPath>,
+        List<MultiObjectDeleteSupport.KeyPath>> pair =
         new MultiObjectDeleteSupport(context, null)
           .splitUndeletedKeys(ex, keys);
-    List<Path> undeleted = pair.getLeft();
-    List<Path> deleted = pair.getRight();
+    List<MultiObjectDeleteSupport.KeyPath> undeleted = pair.getLeft();
+    List<MultiObjectDeleteSupport.KeyPath> deleted = pair.getRight();
     assertEquals(rejected, undeleted);
     // now check the deleted list to verify that it is valid
     src.remove(rejected.get(0));
@@ -97,9 +106,12 @@ public class TestPartialDeleteFailures {
    * @param paths paths to qualify and then convert to a lst.
    * @return same paths as a list.
    */
-  private List<Path> pathList(String... paths) {
+  private List<MultiObjectDeleteSupport.KeyPath> pathList(String... paths) {
     return Arrays.stream(paths)
-        .map(TestPartialDeleteFailures::qualifyKey)
+        .map(k->
+            new MultiObjectDeleteSupport.KeyPath(k,
+                qualifyKey(k),
+                k.endsWith("/")))
         .collect(Collectors.toList());
   }
 
@@ -111,12 +123,13 @@ public class TestPartialDeleteFailures {
    */
   private MultiObjectDeleteException createDeleteException(
       final String code,
-      final List<Path> rejected) {
+      final List<MultiObjectDeleteSupport.KeyPath> rejected) {
     List<MultiObjectDeleteException.DeleteError> errors = rejected.stream()
-        .map((p) -> {
+        .map((kp) -> {
+          Path p = kp.getPath();
           MultiObjectDeleteException.DeleteError e
               = new MultiObjectDeleteException.DeleteError();
-          e.setKey(p.toUri().getPath());
+          e.setKey(kp.getKey());
           e.setCode(code);
           e.setMessage("forbidden");
           return e;
@@ -125,14 +138,33 @@ public class TestPartialDeleteFailures {
   }
 
   /**
-   * From a list of paths, build up the list of keys for a delete request.
+   * From a list of paths, build up the list of KeyVersion records
+   * for a delete request.
+   * All the entries will be files (i.e. no trailing /)
    * @param paths path list
    * @return a key list suitable for a delete request.
    */
   public static List<DeleteObjectsRequest.KeyVersion> keysToDelete(
       List<Path> paths) {
     return paths.stream()
-        .map((p) -> p.toUri().getPath())
+        .map(p -> {
+          String uripath = p.toUri().getPath();
+          return uripath.substring(1);
+        })
+        .map(DeleteObjectsRequest.KeyVersion::new)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * From a list of keys, build up the list of keys for a delete request.
+   * If a key has a trailing /, that will be retained, so it will be
+   * considered a directory during multi-object delete failure handling
+   * @param keys key list
+   * @return a key list suitable for a delete request.
+   */
+  public static List<DeleteObjectsRequest.KeyVersion> toDeleteRequests(
+      List<String> keys) {
+    return keys.stream()
         .map(DeleteObjectsRequest.KeyVersion::new)
         .collect(Collectors.toList());
   }
@@ -143,23 +175,33 @@ public class TestPartialDeleteFailures {
    */
   @Test
   public void testProcessDeleteFailure() throws Throwable {
-    Path pathA = qualifyKey("/a");
-    Path pathAB = qualifyKey("/a/b");
-    Path pathAC = qualifyKey("/a/c");
+    String keyA = "/a/";
+    String keyAB = "/a/b";
+    String keyAC = "/a/c";
+    Path pathA = qualifyKey(keyA);
+    Path pathAB = qualifyKey(keyAB);
+    Path pathAC = qualifyKey(keyAC);
+    List<String> srcKeys = Lists.newArrayList(keyA, keyAB, keyAC);
     List<Path> src = Lists.newArrayList(pathA, pathAB, pathAC);
-    List<DeleteObjectsRequest.KeyVersion> keyList = keysToDelete(src);
+    List<DeleteObjectsRequest.KeyVersion> keyList = toDeleteRequests(srcKeys);
     List<Path> deleteForbidden = Lists.newArrayList(pathAB);
     final List<Path> deleteAllowed = Lists.newArrayList(pathA, pathAC);
+    List<MultiObjectDeleteSupport.KeyPath> forbiddenKP =
+        Lists.newArrayList(
+            new MultiObjectDeleteSupport.KeyPath(keyAB, pathAB, true));
     MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
-        deleteForbidden);
-    S3ATestUtils.OperationTrackingStore store
-        = new S3ATestUtils.OperationTrackingStore();
+        forbiddenKP);
+    OperationTrackingStore store
+        = new OperationTrackingStore();
     StoreContext storeContext = S3ATestUtils
             .createMockStoreContext(true, store, CONTEXT_ACCESSORS);
     MultiObjectDeleteSupport deleteSupport
         = new MultiObjectDeleteSupport(storeContext, null);
+    List<Path> retainedMarkers = new ArrayList<>();
     Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
-        triple = deleteSupport.processDeleteFailure(ex, keyList);
+        triple = deleteSupport.processDeleteFailure(ex,
+        keyList,
+        retainedMarkers);
     Assertions.assertThat(triple.getRight())
         .as("failure list")
         .isEmpty();
@@ -173,6 +215,14 @@ public class TestPartialDeleteFailures {
         as("undeleted store entries")
         .containsAll(deleteForbidden)
         .doesNotContainAnyElementsOf(deleteAllowed);
+    // because dir marker retention is on, we expect at least one retained
+    // marker
+    Assertions.assertThat(retainedMarkers).
+        as("Retained Markers")
+        .containsExactly(pathA);
+    Assertions.assertThat(store.getDeleted()).
+        as("List of tombstoned records")
+        .doesNotContain(pathA);
   }
 
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
index db0542d..d7e277f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
@@ -111,7 +111,10 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
         keepMarkers
             ? DIRECTORY_MARKER_POLICY_KEEP
             : DIRECTORY_MARKER_POLICY_DELETE);
-    conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
+    if (isGuarded()) {
+      conf.set(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_DYNAMO);
+      conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
+    }
     disableFilesystemCaching(conf);
     return conf;
   }
@@ -148,6 +151,7 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
             INVOCATION_COPY_FROM_LOCAL_FILE,
             OBJECT_COPY_REQUESTS,
             OBJECT_DELETE_REQUESTS,
+            OBJECT_DELETE_OBJECTS,
             OBJECT_LIST_REQUESTS,
             OBJECT_METADATA_REQUESTS,
             OBJECT_PUT_BYTES,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java
index d3d976e..618b491 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java
@@ -19,16 +19,20 @@
 package org.apache.hadoop.fs.s3a.performance;
 
 
+import java.io.FileNotFoundException;
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.Tristate;
@@ -37,6 +41,7 @@ import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
 import static org.apache.hadoop.fs.s3a.performance.OperationCost.*;
 import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
  * Use metrics to assert about the cost of file API calls.
@@ -58,7 +63,9 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
         {"raw-keep-markers", false, true, false},
         {"raw-delete-markers", false, false, false},
         {"nonauth-keep-markers", true, true, false},
-        {"auth-delete-markers", true, false, true}
+        {"nonauth-delete-markers", true, false, false},
+        {"auth-delete-markers", true, false, true},
+        {"auth-keep-markers", true, true, true}
     });
   }
 
@@ -145,7 +152,7 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
     boolean rawAndDeleting = isRaw() && isDeleting();
     verifyMetrics(() -> {
       fs.delete(file1, false);
-      return "after fs.delete(file1simpleFile) " + getMetricSummary();
+      return "after fs.delete(file1) " + getMetricSummary();
     },
         // delete file. For keeping: that's it
         probe(rawAndKeeping, OBJECT_METADATA_REQUESTS,
@@ -173,7 +180,24 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
   public void testDirMarkersSubdir() throws Throwable {
     describe("verify cost of deep subdir creation");
 
-    Path subDir = new Path(methodPath(), "1/2/3/4/5/6");
+    Path methodPath = methodPath();
+    Path parent = new Path(methodPath, "parent");
+    Path subDir = new Path(parent, "1/2/3/4/5/6");
+    S3AFileSystem fs = getFileSystem();
+    // this creates a peer of the parent dir, so ensures
+    // that when parent dir is deleted, no markers need to
+    // be recreated...that complicates all the metrics which
+    // are measured
+    Path sibling = new Path(methodPath, "sibling");
+    ContractTestUtils.touch(fs, sibling);
+
+    int dirsCreated = 2;
+    fs.delete(parent, true);
+
+    LOG.info("creating parent dir {}", parent);
+    fs.mkdirs(parent);
+
+    LOG.info("creating sub directory {}", subDir);
     // one dir created, possibly a parent removed
     verifyMetrics(() -> {
       mkdirs(subDir);
@@ -187,6 +211,47 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
         // delete all possible fake dirs above the subdirectory
         withWhenDeleting(FAKE_DIRECTORIES_DELETED,
             directoriesInPath(subDir) - 1));
+
+    int dirDeleteRequests  = 1;
+    int fileDeleteRequests  = 0;
+    int totalDeleteRequests = dirDeleteRequests + fileDeleteRequests;
+
+    LOG.info("About to delete {}", parent);
+    // now delete the deep tree.
+    verifyMetrics(() -> {
+      fs.delete(parent, true);
+      return "deleting parent dir " + parent + " " + getMetricSummary();
+    },
+
+        // two directory markers will be deleted in a single request
+        with(OBJECT_DELETE_REQUESTS, totalDeleteRequests),
+        // keeping: the parent dir marker needs deletion alongside
+        // the subdir one.
+        withWhenKeeping(OBJECT_DELETE_OBJECTS, dirsCreated),
+
+        // deleting: only the marker at the bottom needs deleting
+        withWhenDeleting(OBJECT_DELETE_OBJECTS, 1));
+
+    // followup with list calls to make sure all is clear.
+    verifyNoListing(parent);
+    verifyNoListing(subDir);
+    // now reinstate the directory, which in HADOOP-17244 hitting problems
+    fs.mkdirs(parent);
+    FileStatus[] children = fs.listStatus(parent);
+    Assertions.assertThat(children)
+        .describedAs("Children of %s", parent)
+        .isEmpty();
+  }
+
+  /**
+   * List a path, verify that there are no direct child entries.
+   * @param path path to scan
+   */
+  protected void verifyNoListing(final Path path) throws Exception {
+    intercept(FileNotFoundException.class, () -> {
+      FileStatus[] statuses = getFileSystem().listStatus(path);
+      return Arrays.deepToString(statuses);
+    });
   }
 
   @Test
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalListingOperationCallbacks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalListingOperationCallbacks.java
new file mode 100644
index 0000000..bff9e91
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalListingOperationCallbacks.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.test;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.fs.s3a.S3ListRequest;
+import org.apache.hadoop.fs.s3a.S3ListResult;
+import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
+
+/**
+ * Stub implementation of {@link ListingOperationCallbacks}.
+ */
+public class MinimalListingOperationCallbacks
+    implements ListingOperationCallbacks {
+
+  @Override
+  public CompletableFuture<S3ListResult> listObjectsAsync(
+      S3ListRequest request)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public CompletableFuture<S3ListResult> continueListObjectsAsync(
+      S3ListRequest request,
+      S3ListResult prevResult)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public S3ALocatedFileStatus toLocatedFileStatus(
+      S3AFileStatus status) throws IOException {
+    return null;
+  }
+
+  @Override
+  public S3ListRequest createListObjectsRequest(
+      String key,
+      String delimiter) {
+    return null;
+  }
+
+  @Override
+  public long getDefaultBlockSize(Path path) {
+    return 0;
+  }
+
+  @Override
+  public int getMaxKeys() {
+    return 0;
+  }
+
+  @Override
+  public ITtlTimeProvider getUpdatedTtlTimeProvider() {
+    return null;
+  }
+
+  @Override
+  public boolean allowAuthoritative(Path p) {
+    return false;
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java
new file mode 100644
index 0000000..a50b944
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.test;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.MultiObjectDeleteException;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
+import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
+
+/**
+ * Stub implementation of {@link OperationCallbacks}.
+ */
+public class MinimalOperationCallbacks
+    implements OperationCallbacks {
+
+  @Override
+  public S3ObjectAttributes createObjectAttributes(
+      Path path,
+      String eTag,
+      String versionId,
+      long len) {
+    return null;
+  }
+
+  @Override
+  public S3ObjectAttributes createObjectAttributes(
+      S3AFileStatus fileStatus) {
+    return null;
+  }
+
+  @Override
+  public S3AReadOpContext createReadContext(
+      FileStatus fileStatus) {
+    return null;
+  }
+
+  @Override
+  public void finishRename(
+      Path sourceRenamed,
+      Path destCreated)
+      throws IOException {
+
+  }
+
+  @Override
+  public void deleteObjectAtPath(
+      Path path,
+      String key,
+      boolean isFile,
+      BulkOperationState operationState)
+      throws IOException {
+
+  }
+
+  @Override
+  public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
+      final Path path,
+      final S3AFileStatus status,
+      final boolean collectTombstones,
+      final boolean includeSelf) throws IOException {
+    return null;
+  }
+
+  @Override
+  public CopyResult copyFile(
+      String srcKey,
+      String destKey,
+      S3ObjectAttributes srcAttributes,
+      S3AReadOpContext readContext)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public DeleteObjectsResult removeKeys(
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+      boolean deleteFakeDir,
+      List<Path> undeletedObjectsOnFailure,
+      BulkOperationState operationState,
+      boolean quiet)
+      throws MultiObjectDeleteException, AmazonClientException,
+             IOException {
+    return null;
+  }
+
+  @Override
+  public boolean allowAuthoritative(Path p) {
+    return false;
+  }
+
+  @Override
+  public RemoteIterator<S3AFileStatus> listObjects(
+      Path path,
+      String key)
+      throws IOException {
+    return null;
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/OperationTrackingStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/OperationTrackingStore.java
new file mode 100644
index 0000000..1bf0c3e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/OperationTrackingStore.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.test;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
+import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
+
+/**
+ * MetadataStore which tracks what is deleted and added.
+ */
+public class OperationTrackingStore implements MetadataStore {
+
+  private final List<Path> deleted = new ArrayList<>();
+
+  private final List<Path> created = new ArrayList<>();
+
+  @Override
+  public void initialize(final FileSystem fs,
+      ITtlTimeProvider ttlTimeProvider) {
+  }
+
+  @Override
+  public void initialize(final Configuration conf,
+      ITtlTimeProvider ttlTimeProvider) {
+  }
+
+  @Override
+  public void forgetMetadata(final Path path) {
+  }
+
+  @Override
+  public PathMetadata get(final Path path) {
+    return null;
+  }
+
+  @Override
+  public PathMetadata get(final Path path,
+      final boolean wantEmptyDirectoryFlag) {
+    return null;
+  }
+
+  @Override
+  public DirListingMetadata listChildren(final Path path) {
+    return null;
+  }
+
+  @Override
+  public void put(final PathMetadata meta) {
+    put(meta, null);
+  }
+
+  @Override
+  public void put(final PathMetadata meta,
+      final BulkOperationState operationState) {
+    created.add(meta.getFileStatus().getPath());
+  }
+
+  @Override
+  public void put(final Collection<? extends PathMetadata> metas,
+      final BulkOperationState operationState) {
+    metas.stream().forEach(meta -> put(meta, null));
+  }
+
+  @Override
+  public void put(final DirListingMetadata meta,
+      final List<Path> unchangedEntries,
+      final BulkOperationState operationState) {
+    created.add(meta.getPath());
+  }
+
+  @Override
+  public void destroy() {
+  }
+
+  @Override
+  public void delete(final Path path,
+      final BulkOperationState operationState) {
+    deleted.add(path);
+  }
+
+  @Override
+  public void deletePaths(final Collection<Path> paths,
+      @Nullable final BulkOperationState operationState)
+      throws IOException {
+    deleted.addAll(paths);
+  }
+
+  @Override
+  public void deleteSubtree(final Path path,
+      final BulkOperationState operationState) {
+
+  }
+
+  @Override
+  public void move(@Nullable final Collection<Path> pathsToDelete,
+      @Nullable final Collection<PathMetadata> pathsToCreate,
+      @Nullable final BulkOperationState operationState) {
+  }
+
+  @Override
+  public void prune(final PruneMode pruneMode, final long cutoff) {
+  }
+
+  @Override
+  public long prune(final PruneMode pruneMode,
+      final long cutoff,
+      final String keyPrefix) {
+    return 0;
+  }
+
+  @Override
+  public BulkOperationState initiateBulkWrite(
+      final BulkOperationState.OperationType operation,
+      final Path dest) {
+    return null;
+  }
+
+  @Override
+  public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
+  }
+
+  @Override
+  public Map<String, String> getDiagnostics() {
+    return null;
+  }
+
+  @Override
+  public void updateParameters(final Map<String, String> parameters) {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  public List<Path> getDeleted() {
+    return deleted;
+  }
+
+  public List<Path> getCreated() {
+    return created;
+  }
+
+  @Override
+  public RenameTracker initiateRenameOperation(
+      final StoreContext storeContext,
+      final Path source,
+      final S3AFileStatus sourceStatus,
+      final Path dest) {
+    throw new UnsupportedOperationException("unsupported");
+  }
+
+  @Override
+  public void addAncestors(final Path qualifiedPath,
+      @Nullable final BulkOperationState operationState) {
+
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org