You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/09/05 13:25:43 UTC

[hadoop] branch trunk updated: HADOOP-16430. S3AFilesystem.delete to incrementally update s3guard with deletions

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 511df1e  HADOOP-16430. S3AFilesystem.delete to incrementally update s3guard with deletions
511df1e is described below

commit 511df1e837b19ccb9271520589452d82d50ac69d
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Thu Sep 5 14:25:15 2019 +0100

    HADOOP-16430. S3AFilesystem.delete to incrementally update s3guard with deletions
    
    Contributed by Steve Loughran.
    
    This overlaps the scanning for directory entries with batched calls to S3 DELETE and updates of the S3Guard tables.
    It also uses S3Guard to list the files to delete, so find newly created files even when S3 listings are not use consistent.
    
    For path which the client considers S3Guard to be authoritative, we also do a recursive LIST of the store and delete files; this is to find unindexed files and do guarantee that the delete(path, true) call really does delete everything underneath.
    
    Change-Id: Ice2f6e940c506e0b3a78fa534a99721b1698708e
---
 .../org/apache/hadoop/fs/impl/FutureIOSupport.java |   8 +-
 .../org/apache/hadoop/fs/FileContextURIBase.java   |  19 +-
 .../AbstractContractGetFileStatusTest.java         |  12 +-
 .../hadoop/fs/contract/ContractTestUtils.java      |  18 +-
 .../hadoop/fs/s3a/InconsistentAmazonS3Client.java  |  32 +-
 .../apache/hadoop/fs/s3a/InternalConstants.java    |  53 --
 .../java/org/apache/hadoop/fs/s3a/Listing.java     |   8 +-
 .../org/apache/hadoop/fs/s3a/S3AFileStatus.java    |  56 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 364 ++++++-------
 .../apache/hadoop/fs/s3a/S3ALocatedFileStatus.java |  28 +-
 .../apache/hadoop/fs/s3a/WriteOperationHelper.java |   6 +-
 .../hadoop/fs/s3a/commit/CommitOperations.java     |   8 +-
 .../apache/hadoop/fs/s3a/impl/DeleteOperation.java | 577 +++++++++++++++++++++
 .../fs/s3a/impl/ExecutingStoreOperation.java       |  69 +++
 .../hadoop/fs/s3a/impl/InternalConstants.java      |  26 +
 .../fs/s3a/impl/MultiObjectDeleteSupport.java      |  10 +-
 .../hadoop/fs/s3a/impl/OperationCallbacks.java     | 198 +++++++
 .../apache/hadoop/fs/s3a/impl/RenameOperation.java | 164 +-----
 .../apache/hadoop/fs/s3a/impl/StoreContext.java    |  19 +
 .../fs/s3a/s3guard/DelayedUpdateRenameTracker.java |   4 +-
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java      |  77 ++-
 .../hadoop/fs/s3a/s3guard/InternalIterators.java   |  91 ++++
 .../hadoop/fs/s3a/s3guard/LocalMetadataStore.java  |  16 +-
 .../hadoop/fs/s3a/s3guard/MetadataStore.java       |  27 +-
 .../hadoop/fs/s3a/s3guard/NullMetadataStore.java   |  12 +-
 .../fs/s3a/s3guard/ProgressiveRenameTracker.java   |   3 +-
 .../org/apache/hadoop/fs/s3a/s3guard/S3Guard.java  |   9 +
 .../fs/s3a/select/InternalSelectConstants.java     |   2 +-
 .../fs/contract/s3a/ITestS3AContractRootDir.java   |   3 +-
 .../hadoop/fs/s3a/ITestS3AFailureHandling.java     |   8 +-
 .../s3a/ITestS3AMetadataPersistenceException.java  |  42 +-
 .../hadoop/fs/s3a/ITestS3GuardListConsistency.java | 126 +++--
 .../fs/s3a/ITestS3GuardOutOfBandOperations.java    |  63 ++-
 .../apache/hadoop/fs/s3a/MockS3AFileSystem.java    |   5 +-
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |  11 +
 .../fs/s3a/commit/ITestCommitOperations.java       |   4 +-
 .../fs/s3a/impl/ITestPartialRenamesDeletes.java    |   4 +-
 .../fs/s3a/impl/TestPartialDeleteFailures.java     |  16 +-
 .../s3guard/ITestDynamoDBMetadataStoreScale.java   |   4 +-
 .../fs/s3a/s3guard/MetadataStoreTestBase.java      |  16 +-
 .../scale/AbstractITestS3AMetadataStoreScale.java  |   2 +-
 .../fs/s3a/scale/ITestS3ADeleteManyFiles.java      |  94 ++--
 .../hadoop/fs/s3a/scale/S3AScaleTestBase.java      |   4 +
 43 files changed, 1740 insertions(+), 578 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
index 9fe4023..26856e5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
@@ -144,13 +144,13 @@ public final class FutureIOSupport {
     Throwable cause = e.getCause();
     if (cause instanceof IOException) {
       return (IOException) cause;
-    } else if (cause instanceof WrappedIOException){
+    } else if (cause instanceof WrappedIOException) {
       return ((WrappedIOException) cause).getCause();
-    } else if (cause instanceof CompletionException){
+    } else if (cause instanceof CompletionException) {
       return unwrapInnerException(cause);
-    } else if (cause instanceof ExecutionException){
+    } else if (cause instanceof ExecutionException) {
       return unwrapInnerException(cause);
-    } else if (cause instanceof RuntimeException){
+    } else if (cause instanceof RuntimeException) {
       throw (RuntimeException) cause;
     } else if (cause != null) {
       // other type: wrap with a new IOE
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextURIBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextURIBase.java
index a99f762..abb6d4f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextURIBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextURIBase.java
@@ -130,7 +130,7 @@ public abstract class FileContextURIBase {
 
   @Test
   public void testCreateExistingFile() throws IOException {
-    String fileName = "testFile";
+    String fileName = "testCreateExistingFile";
     Path testPath = qualifiedPath(fileName, fc2);
 
     // Ensure file does not exist
@@ -153,7 +153,7 @@ public abstract class FileContextURIBase {
 
   @Test
   public void testCreateFileInNonExistingDirectory() throws IOException {
-    String fileName = "testDir/testFile";
+    String fileName = "testCreateFileInNonExistingDirectory/testFile";
 
     Path testPath = qualifiedPath(fileName, fc2);
 
@@ -165,7 +165,8 @@ public abstract class FileContextURIBase {
 
     // Ensure using fc2 that file is created
     Assert.assertTrue(isDir(fc2, testPath.getParent()));
-    Assert.assertEquals("testDir", testPath.getParent().getName());
+    Assert.assertEquals("testCreateFileInNonExistingDirectory",
+        testPath.getParent().getName());
     Assert.assertTrue(exists(fc2, testPath));
 
   }
@@ -293,7 +294,7 @@ public abstract class FileContextURIBase {
 
   @Test
   public void testDeleteFile() throws IOException {
-    Path testPath = qualifiedPath("testFile", fc2);
+    Path testPath = qualifiedPath("testDeleteFile", fc2);
 
     // Ensure file does not exist
     Assert.assertFalse(exists(fc2, testPath));
@@ -314,7 +315,7 @@ public abstract class FileContextURIBase {
 
   @Test
   public void testDeleteNonExistingFile() throws IOException {
-    String testFileName = "testFile";
+    String testFileName = "testDeleteNonExistingFile";
     Path testPath = qualifiedPath(testFileName, fc2);
 
     // TestCase1 : Test delete on file never existed
@@ -341,7 +342,7 @@ public abstract class FileContextURIBase {
 
   @Test
   public void testDeleteNonExistingFileInDir() throws IOException {
-    String testFileInDir = "testDir/testDir/TestFile";
+    String testFileInDir = "testDeleteNonExistingFileInDir/testDir/TestFile";
     Path testPath = qualifiedPath(testFileInDir, fc2);
 
     // TestCase1 : Test delete on file never existed
@@ -418,7 +419,7 @@ public abstract class FileContextURIBase {
 
   @Test
   public void testDeleteNonExistingDirectory() throws IOException {
-    String testDirName = "testFile";
+    String testDirName = "testDeleteNonExistingDirectory";
     Path testPath = qualifiedPath(testDirName, fc2);
 
     // TestCase1 : Test delete on directory never existed
@@ -445,7 +446,7 @@ public abstract class FileContextURIBase {
 
   @Test
   public void testModificationTime() throws IOException {
-    String testFile = "file1";
+    String testFile = "testModificationTime";
     long fc2ModificationTime, fc1ModificationTime;
 
     Path testPath = qualifiedPath(testFile, fc2);
@@ -461,7 +462,7 @@ public abstract class FileContextURIBase {
 
   @Test
   public void testFileStatus() throws IOException {
-    String fileName = "file1";
+    String fileName = "testModificationTime";
     Path path2 = fc2.makeQualified(new Path(BASE, fileName));
 
     // Create a file on fc2's file system using fc1
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
index eb1fd61..85bd137 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
@@ -112,9 +112,11 @@ public abstract class AbstractContractGetFileStatusTest extends
   private void listFilesOnEmptyDir(boolean recursive) throws IOException {
     describe("Invoke listFiles(recursive=" + recursive + ")" +
         " on empty directories, expect nothing found");
-    Path subfolder = createDirWithEmptySubFolder();
     FileSystem fs = getFileSystem();
-    new TreeScanResults(fs.listFiles(getContract().getTestPath(), recursive))
+    Path path = getContract().getTestPath();
+    fs.delete(path, true);
+    Path subfolder = createDirWithEmptySubFolder();
+    new TreeScanResults(fs.listFiles(path, recursive))
         .assertSizeEquals("listFiles(test dir, " + recursive + ")", 0, 0, 0);
     describe("Test on empty subdirectory");
     new TreeScanResults(fs.listFiles(subfolder, recursive))
@@ -126,9 +128,11 @@ public abstract class AbstractContractGetFileStatusTest extends
   public void testListLocatedStatusEmptyDirectory() throws IOException {
     describe("Invoke listLocatedStatus() on empty directories;" +
         " expect directories to be found");
-    Path subfolder = createDirWithEmptySubFolder();
     FileSystem fs = getFileSystem();
-    new TreeScanResults(fs.listLocatedStatus(getContract().getTestPath()))
+    Path path = getContract().getTestPath();
+    fs.delete(path, true);
+    Path subfolder = createDirWithEmptySubFolder();
+    new TreeScanResults(fs.listLocatedStatus(path))
       .assertSizeEquals("listLocatedStatus(test dir)", 0, 1, 0);
     describe("Test on empty subdirectory");
     new TreeScanResults(fs.listLocatedStatus(subfolder))
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index 7bd9840..64f9cb8 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -1628,6 +1628,22 @@ public class ContractTestUtils extends Assert {
     }
 
     /**
+     * Dump the files and directories to a multi-line string for error
+     * messages and assertions.
+     * @return a dump of the internal state
+     */
+    private String dump() {
+      StringBuilder sb = new StringBuilder(toString());
+      sb.append("\nFiles:");
+      directories.forEach(p ->
+          sb.append("\n  \"").append(p.toString()));
+      sb.append("\nDirectories:");
+      files.forEach(p ->
+          sb.append("\n  \"").append(p.toString()));
+      return sb.toString();
+    }
+
+    /**
      * Equality check compares files and directory counts.
      * As these are non-final fields, this class cannot be used in
      * hash tables.
@@ -1667,7 +1683,7 @@ public class ContractTestUtils extends Assert {
      * @param o expected other entries.
      */
     public void assertSizeEquals(String text, long f, long d, long o) {
-      String self = toString();
+      String self = dump();
       Assert.assertEquals(text + ": file count in " + self,
           f, getFileCount());
       Assert.assertEquals(text + ": directory count in " + self,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
index 2e77dc8..34c043b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -199,6 +199,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
       deleteObjectsRequest)
       throws AmazonClientException, AmazonServiceException {
     maybeFail();
+    LOG.info("registering bulk delete of objects");
     for (DeleteObjectsRequest.KeyVersion keyVersion :
         deleteObjectsRequest.getKeys()) {
       registerDeleteObject(keyVersion.getKey(),
@@ -278,6 +279,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     // Behavior of S3ObjectSummary
     String key = item.getKey();
     if (list.stream().noneMatch((member) -> member.getKey().equals(key))) {
+      LOG.debug("Reinstate summary {}", key);
       list.add(item);
     }
   }
@@ -302,6 +304,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
       if (nextParent.equals(ancestorPath)) {
         String prefix = prefixCandidate.toString();
         if (!prefixes.contains(prefix)) {
+          LOG.debug("Reinstate prefix {}", prefix);
           prefixes.add(prefix);
         }
         return;
@@ -401,6 +404,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
         }
       } else {
         // Clean up any expired entries
+        LOG.debug("Remove expired key {}", key);
         delayedDeletes.remove(key);
       }
     }
@@ -467,16 +471,24 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
 
   private void registerDeleteObject(String key, String bucket) {
     if (policy.shouldDelay(key)) {
-      // Record summary so we can add it back for some time post-deletion
-      ListObjectsRequest request = new ListObjectsRequest()
-              .withBucketName(bucket)
-              .withPrefix(key);
-      S3ObjectSummary summary = innerlistObjects(request).getObjectSummaries()
-          .stream()
-          .filter(result -> result.getKey().equals(key))
-          .findFirst()
-          .orElse(null);
-      delayedDeletes.put(key, new Delete(System.currentTimeMillis(), summary));
+      Delete delete = delayedDeletes.get(key);
+      if (delete != null && isKeyDelayed(delete.time(), key)) {
+        // there is already an entry in the delayed delete list,
+        // so ignore the operation
+        LOG.debug("Ignoring delete of already deleted object");
+      } else {
+        // Record summary so we can add it back for some time post-deletion
+        ListObjectsRequest request = new ListObjectsRequest()
+            .withBucketName(bucket)
+            .withPrefix(key);
+        S3ObjectSummary summary = innerlistObjects(request).getObjectSummaries()
+            .stream()
+            .filter(result -> result.getKey().equals(key))
+            .findFirst()
+            .orElse(null);
+        delayedDeletes.put(key, new Delete(System.currentTimeMillis(),
+            summary));
+      }
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InternalConstants.java
deleted file mode 100644
index 509217f..0000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InternalConstants.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Constants for internal use in the org.apache.hadoop.fs.s3a module itself.
- * Please don't refer to these outside of this module &amp; its tests.
- * If you find you need to then either the code is doing something it
- * should not, or these constants need to be uprated to being
- * public and stable entries.
- */
-@InterfaceAudience.Private
-public final class InternalConstants {
-
-  private InternalConstants() {
-  }
-
-  /**
-   * The known keys used in a standard openFile call.
-   * if there's a select marker in there then the keyset
-   * used becomes that of the select operation.
-   */
-  @InterfaceStability.Unstable
-  public static final Set<String> STANDARD_OPENFILE_KEYS =
-      Collections.unmodifiableSet(
-          new HashSet<>(
-              Arrays.asList(Constants.INPUT_FADVISE,
-                  Constants.READAHEAD_RANGE)));
-}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 54f8fc6..9c2f67d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import javax.annotation.Nullable;
+
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.annotations.VisibleForTesting;
@@ -58,6 +60,9 @@ public class Listing {
   private final S3AFileSystem owner;
   private static final Logger LOG = S3AFileSystem.LOG;
 
+  static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
+      new AcceptAllButS3nDirs();
+
   public Listing(S3AFileSystem owner) {
     this.owner = owner;
   }
@@ -339,7 +344,8 @@ public class Listing {
     FileStatusListingIterator(ObjectListingIterator source,
         PathFilter filter,
         FileStatusAcceptor acceptor,
-        RemoteIterator<S3AFileStatus> providedStatus) throws IOException {
+        @Nullable RemoteIterator<S3AFileStatus> providedStatus)
+        throws IOException {
       this.source = source;
       this.filter = filter;
       this.acceptor = acceptor;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
index e8ff846..46a2346 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
@@ -31,6 +31,9 @@ import org.apache.hadoop.fs.Path;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class S3AFileStatus extends FileStatus {
+
+  private static final long serialVersionUID = -5955674081978903922L;
+
   private Tristate isEmptyDirectory;
   private String eTag;
   private String versionId;
@@ -56,12 +59,16 @@ public class S3AFileStatus extends FileStatus {
   public S3AFileStatus(Tristate isemptydir,
       Path path,
       String owner) {
-    super(0, true, 1, 0, 0, 0,
-        null, null, null, null,
-        path, false, true, false);
-    isEmptyDirectory = isemptydir;
-    setOwner(owner);
-    setGroup(owner);
+    this(path,
+        true,
+        isemptydir,
+        0,
+        0,
+        0,
+        owner,
+        null,
+        null
+    );
   }
 
   /**
@@ -76,10 +83,43 @@ public class S3AFileStatus extends FileStatus {
    */
   public S3AFileStatus(long length, long modification_time, Path path,
       long blockSize, String owner, String eTag, String versionId) {
-    super(length, false, 1, blockSize, modification_time,
+    this(path,
+        false,
+        Tristate.FALSE,
+        length,
+        modification_time,
+        blockSize,
+        owner,
+        eTag,
+        versionId
+    );
+  }
+
+  /**
+   * Either a file or directory.
+   * @param path path
+   * @param isDir is this a directory?
+   * @param isemptydir is this an empty directory?
+   * @param length file length
+   * @param modificationTime mod time
+   * @param blockSize block size
+   * @param owner owner
+   * @param eTag eTag of the S3 object if available, else null
+   * @param versionId versionId of the S3 object if available, else null
+   */
+  S3AFileStatus(Path path,
+      boolean isDir,
+      Tristate isemptydir,
+      long length,
+      long modificationTime,
+      long blockSize,
+      String owner,
+      String eTag,
+      String versionId) {
+    super(length, isDir, 1, blockSize, modificationTime,
         0, null, owner, owner, null,
         path, false, true, false);
-    isEmptyDirectory = Tristate.FALSE;
+    this.isEmptyDirectory = isemptydir;
     this.eTag = eTag;
     this.versionId = versionId;
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index aa00ae8..6edbed7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -56,6 +56,7 @@ import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
@@ -96,7 +97,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
 import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
+import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
+import org.apache.hadoop.fs.s3a.impl.InternalConstants;
 import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
+import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.RenameOperation;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
@@ -115,7 +119,6 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathIOException;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -245,8 +248,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   /** Principal who created the FS; recorded during initialization. */
   private UserGroupInformation owner;
 
-  // The maximum number of entries that can be deleted in any call to s3
-  private static final int MAX_ENTRIES_TO_DELETE = 1000;
   private String blockOutputBuffer;
   private S3ADataBlocks.BlockFactory blockFactory;
   private int blockOutputActiveBlocks;
@@ -259,6 +260,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
   private ITtlTimeProvider ttlTimeProvider;
 
+  /**
+   * Specific operations used by rename and delete operations.
+   */
+  private final S3AFileSystem.OperationCallbacksImpl
+      operationCallbacks = new OperationCallbacksImpl();
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -1309,17 +1316,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         createStoreContext(),
         src, srcKey, p.getLeft(),
         dst, dstKey, p.getRight(),
-        new RenameOperationCallbacksImpl());
-    return renameOperation.executeRename();
+        operationCallbacks);
+    return renameOperation.execute();
   }
 
   /**
-   * All the callbacks made by the rename operation of the filesystem.
+   * The callbacks made by the rename and delete operations.
    * This separation allows the operation to be factored out and
    * still avoid knowledge of the S3AFilesystem implementation.
    */
-  private class RenameOperationCallbacksImpl implements
-      RenameOperation.RenameOperationCallbacks {
+  private class OperationCallbacksImpl implements OperationCallbacks {
 
     @Override
     public S3ObjectAttributes createObjectAttributes(final Path path,
@@ -1331,7 +1337,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     }
 
     @Override
-    public S3ObjectAttributes createObjectAttributes(final S3AFileStatus fileStatus) {
+    public S3ObjectAttributes createObjectAttributes(
+        final S3AFileStatus fileStatus) {
       return S3AFileSystem.this.createObjectAttributes(fileStatus);
     }
 
@@ -1343,18 +1350,32 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     }
 
     @Override
+    @Retries.RetryTranslated
     public void deleteObjectAtPath(final Path path,
         final String key,
-        final boolean isFile)
+        final boolean isFile,
+        final BulkOperationState operationState)
         throws IOException {
-      S3AFileSystem.this.deleteObjectAtPath(path, key, isFile);
+      once("delete", key, () ->
+          S3AFileSystem.this.deleteObjectAtPath(path, key, isFile,
+              operationState));
     }
 
     @Override
     @Retries.RetryTranslated
     public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
-        final Path path) throws IOException {
-      return S3AFileSystem.this.listFilesAndEmptyDirectories(path, true);
+        final Path path,
+        final S3AFileStatus status,
+        final boolean collectTombstones,
+        final boolean includeSelf) throws IOException {
+      return innerListFiles(
+          path,
+          true,
+          includeSelf
+              ? Listing.ACCEPT_ALL_BUT_S3N
+              : new Listing.AcceptAllButSelfAndS3nDirs(path),
+          status,
+          collectTombstones);
     }
 
     @Override
@@ -1367,12 +1388,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     }
 
     @Override
-    public void removeKeys(final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+    public DeleteObjectsResult removeKeys(
+        final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
         final boolean deleteFakeDir,
-        final List<Path> undeletedObjectsOnFailure)
+        final List<Path> undeletedObjectsOnFailure,
+        final BulkOperationState operationState,
+        final boolean quiet)
         throws MultiObjectDeleteException, AmazonClientException, IOException {
-      S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir,
-          undeletedObjectsOnFailure);
+      return S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir,
+          undeletedObjectsOnFailure, operationState, quiet);
     }
 
     @Override
@@ -1385,6 +1409,25 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         maybeCreateFakeParentDirectory(sourceRenamed);
       }
     }
+
+    @Override
+    public boolean allowAuthoritative(final Path p) {
+      return S3AFileSystem.this.allowAuthoritative(p);
+    }
+
+    @Override
+    @Retries.RetryTranslated
+    public RemoteIterator<S3AFileStatus> listObjects(
+        final Path path,
+        final String key)
+        throws IOException {
+      return once("listObjects", key, () ->
+          listing.createFileStatusListingIterator(path,
+              createListObjectsRequest(key, null),
+              ACCEPT_ALL,
+              Listing.ACCEPT_ALL_BUT_S3N,
+              null));
+    }
   }
 
   /**
@@ -1738,14 +1781,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       throws AmazonClientException, IOException {
     blockRootDelete(key);
     incrementWriteOperations();
-    LOG.debug("DELETE {}", key);
-    invoker.retryUntranslated("Delete "+ bucket + ":/" + key,
-        DELETE_CONSIDERED_IDEMPOTENT,
-        ()-> {
-          incrementStatistic(OBJECT_DELETE_REQUESTS);
-          s3.deleteObject(bucket, key);
-          return null;
-        });
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, false,
+                 "deleting %s", key)) {
+      invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
+          DELETE_CONSIDERED_IDEMPOTENT,
+          ()-> {
+            incrementStatistic(OBJECT_DELETE_REQUESTS);
+            s3.deleteObject(bucket, key);
+            return null;
+          });
+    }
   }
 
   /**
@@ -1755,11 +1801,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * @param f path path to delete
    * @param key key of entry
    * @param isFile is the path a file (used for instrumentation only)
+   * @param operationState (nullable) operational state for a bulk update
    * @throws AmazonClientException problems working with S3
    * @throws IOException IO failure in the metastore
    */
   @Retries.RetryMixed
-  void deleteObjectAtPath(Path f, String key, boolean isFile)
+  void deleteObjectAtPath(Path f,
+      String key,
+      boolean isFile,
+      @Nullable final BulkOperationState operationState)
       throws AmazonClientException, IOException {
     if (isFile) {
       instrumentation.fileDeleted(1);
@@ -1767,7 +1817,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       instrumentation.directoryDeleted();
     }
     deleteObject(key);
-    metadataStore.delete(f);
+    metadataStore.delete(f, operationState);
   }
 
   /**
@@ -1789,18 +1839,19 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * operation statistics.
    * Retry policy: retry untranslated; delete considered idempotent.
    * @param deleteRequest keys to delete on the s3-backend
+   * @return the AWS response
    * @throws MultiObjectDeleteException one or more of the keys could not
    * be deleted.
    * @throws AmazonClientException amazon-layer failure.
    */
   @Retries.RetryRaw
-  private void deleteObjects(DeleteObjectsRequest deleteRequest)
+  private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
       throws MultiObjectDeleteException, AmazonClientException, IOException {
     incrementWriteOperations();
     try(DurationInfo ignored =
             new DurationInfo(LOG, false, "DELETE %d keys",
                 deleteRequest.getKeys().size())) {
-      invoker.retryUntranslated("delete",
+      return invoker.retryUntranslated("delete",
           DELETE_CONSIDERED_IDEMPOTENT,
           () -> {
             incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
@@ -2043,6 +2094,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * @param keysToDelete collection of keys to delete on the s3-backend.
    *        if empty, no request is made of the object store.
    * @param deleteFakeDir indicates whether this is for deleting fake dirs
+   * @param quiet should a bulk query be quiet, or should its result list
+   * all deleted keys?
+   * @return the deletion result if a multi object delete was invoked
+   * and it returned without a failure.
    * @throws InvalidRequestException if the request was rejected due to
    * a mistaken attempt to delete the root directory.
    * @throws MultiObjectDeleteException one or more of the keys could not
@@ -2052,22 +2107,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * @throws AmazonClientException other amazon-layer failure.
    */
   @Retries.RetryRaw
-  private void removeKeysS3(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
-      boolean deleteFakeDir)
+  private DeleteObjectsResult removeKeysS3(
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+      boolean deleteFakeDir,
+      boolean quiet)
       throws MultiObjectDeleteException, AmazonClientException,
       IOException {
+    DeleteObjectsResult result = null;
     if (keysToDelete.isEmpty()) {
       // exit fast if there are no keys to delete
-      return;
+      return result;
     }
     for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
       blockRootDelete(keyVersion.getKey());
     }
     try {
       if (enableMultiObjectsDelete) {
-        deleteObjects(new DeleteObjectsRequest(bucket)
-            .withKeys(keysToDelete)
-            .withQuiet(true));
+        result = deleteObjects(
+            new DeleteObjectsRequest(bucket)
+                .withKeys(keysToDelete)
+                .withQuiet(quiet));
       } else {
         for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
           deleteObject(keyVersion.getKey());
@@ -2083,6 +2142,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       throw ex;
     }
     noteDeleted(keysToDelete.size(), deleteFakeDir);
+    return result;
   }
 
   /**
@@ -2105,6 +2165,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * @param keysToDelete collection of keys to delete on the s3-backend.
    *        if empty, no request is made of the object store.
    * @param deleteFakeDir indicates whether this is for deleting fake dirs
+   * @param operationState (nullable) operational state for a bulk update
    * @throws InvalidRequestException if the request was rejected due to
    * a mistaken attempt to delete the root directory.
    * @throws MultiObjectDeleteException one or more of the keys could not
@@ -2116,14 +2177,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   @Retries.RetryMixed
   void removeKeys(
       final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
-      final boolean deleteFakeDir)
+      final boolean deleteFakeDir,
+      final BulkOperationState operationState)
       throws MultiObjectDeleteException, AmazonClientException,
       IOException {
-    removeKeys(keysToDelete, deleteFakeDir, new ArrayList<>());
+    removeKeys(keysToDelete, deleteFakeDir, new ArrayList<>(), operationState,
+        true);
   }
 
   /**
-   * Invoke {@link #removeKeysS3(List, boolean)} with handling of
+   * Invoke {@link #removeKeysS3(List, boolean, boolean)} with handling of
    * {@code MultiObjectDeleteException} before the exception is rethrown.
    * Specifically:
    * <ol>
@@ -2138,6 +2201,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * @param undeletedObjectsOnFailure List which will be built up of all
    * files that were not deleted. This happens even as an exception
    * is raised.
+   * @param operationState (nullable) operational state for a bulk update
+   * @param quiet should a bulk query be quiet, or should its result list
+   * all deleted keys
+   * @return the deletion result if a multi object delete was invoked
+   * and it returned without a failure, else null.
    * @throws InvalidRequestException if the request was rejected due to
    * a mistaken attempt to delete the root directory.
    * @throws MultiObjectDeleteException one or more of the keys could not
@@ -2145,17 +2213,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * @throws AmazonClientException amazon-layer failure.
    * @throws IOException other IO Exception.
    */
-  @VisibleForTesting
   @Retries.RetryMixed
-  void removeKeys(
+  DeleteObjectsResult removeKeys(
       final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
       final boolean deleteFakeDir,
-      final List<Path> undeletedObjectsOnFailure)
-      throws MultiObjectDeleteException, AmazonClientException,
-      IOException {
+      final List<Path> undeletedObjectsOnFailure,
+      final BulkOperationState operationState,
+      final boolean quiet)
+      throws MultiObjectDeleteException, AmazonClientException, IOException {
     undeletedObjectsOnFailure.clear();
-    try(DurationInfo ignored = new DurationInfo(LOG, false, "Deleting")) {
-      removeKeysS3(keysToDelete, deleteFakeDir);
+    try (DurationInfo ignored = new DurationInfo(LOG, false, "Deleting")) {
+      return removeKeysS3(keysToDelete, deleteFakeDir, quiet);
     } catch (MultiObjectDeleteException ex) {
       LOG.debug("Partial delete failure");
       // what to do if an IOE was raised? Given an exception was being
@@ -2164,13 +2232,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         // when deleting fake directories we don't want to delete metastore
         // entries so we only process these failures on "real" deletes.
         Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>> results =
-            new MultiObjectDeleteSupport(createStoreContext())
+            new MultiObjectDeleteSupport(createStoreContext(), operationState)
                 .processDeleteFailure(ex, keysToDelete);
         undeletedObjectsOnFailure.addAll(results.getMiddle());
       }
       throw ex;
     } catch (AmazonClientException | IOException ex) {
-      List<Path> paths = new MultiObjectDeleteSupport(createStoreContext())
+      List<Path> paths = new MultiObjectDeleteSupport(
+          createStoreContext(),
+          operationState)
           .processDeleteFailureGenericException(ex, keysToDelete);
       // other failures. Assume nothing was deleted
       undeletedObjectsOnFailure.addAll(paths);
@@ -2195,7 +2265,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   public boolean delete(Path f, boolean recursive) throws IOException {
     try {
       entryPoint(INVOCATION_DELETE);
-      boolean outcome = innerDelete(innerGetFileStatus(f, true), recursive);
+      DeleteOperation deleteOperation = new DeleteOperation(
+          createStoreContext(),
+          innerGetFileStatus(f, true),
+          recursive,
+          operationCallbacks,
+          InternalConstants.MAX_ENTRIES_TO_DELETE);
+      boolean outcome = deleteOperation.execute();
       if (outcome) {
         try {
           maybeCreateFakeParentDirectory(f);
@@ -2216,110 +2292,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   /**
-   * Delete an object. See {@link #delete(Path, boolean)}.
-   * This call does not create any fake parent directory; that is
-   * left to the caller.
-   * @param status fileStatus object
-   * @param recursive if path is a directory and set to
-   * true, the directory is deleted else throws an exception. In
-   * case of a file the recursive can be set to either true or false.
-   * @return true, except in the corner cases of root directory deletion
-   * @throws IOException due to inability to delete a directory or file.
-   * @throws AmazonClientException on failures inside the AWS SDK
-   */
-  @Retries.RetryMixed
-  private boolean innerDelete(S3AFileStatus status, boolean recursive)
-      throws IOException, AmazonClientException {
-    Path f = status.getPath();
-    LOG.debug("Delete path {} - recursive {}", f, recursive);
-    LOG.debug("Type = {}",
-        status.isFile() ? "File"
-            : (status.isEmptyDirectory() == Tristate.TRUE
-                ? "Empty Directory"
-                : "Directory"));
-
-    String key = pathToKey(f);
-
-    if (status.isDirectory()) {
-      LOG.debug("delete: Path is a directory: {}", f);
-      Preconditions.checkArgument(
-          status.isEmptyDirectory() != Tristate.UNKNOWN,
-          "File status must have directory emptiness computed");
-
-      if (!key.endsWith("/")) {
-        key = key + "/";
-      }
-
-      if (key.equals("/")) {
-        return rejectRootDirectoryDelete(status, recursive);
-      }
-
-      if (!recursive && status.isEmptyDirectory() == Tristate.FALSE) {
-        throw new PathIsNotEmptyDirectoryException(f.toString());
-      }
-
-      if (status.isEmptyDirectory() == Tristate.TRUE) {
-        LOG.debug("Deleting fake empty directory {}", key);
-        // HADOOP-13761 s3guard: retries here
-        deleteObjectAtPath(f, key, false);
-      } else {
-        LOG.debug("Getting objects for directory prefix {} to delete", key);
-
-        S3ListRequest request = createListObjectsRequest(key, null);
-
-        S3ListResult objects = listObjects(request);
-        List<DeleteObjectsRequest.KeyVersion> keys =
-            new ArrayList<>(objects.getObjectSummaries().size());
-        while (true) {
-          for (S3ObjectSummary summary : objects.getObjectSummaries()) {
-            keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
-            LOG.debug("Got object to delete {}", summary.getKey());
-
-            if (keys.size() == MAX_ENTRIES_TO_DELETE) {
-              // delete a single page of keys
-              removeKeys(keys, false);
-              keys.clear();
-            }
-          }
-
-          if (objects.isTruncated()) {
-            objects = continueListObjects(request, objects);
-          } else {
-            // there is no more data: delete the final set of entries.
-            removeKeys(keys, false);
-            break;
-          }
-        }
-      }
-      try(DurationInfo ignored =
-              new DurationInfo(LOG, false, "Delete metastore")) {
-        metadataStore.deleteSubtree(f);
-      }
-    } else {
-      LOG.debug("delete: Path is a file: {}", key);
-      deleteObjectAtPath(f, key, true);
-    }
-
-    return true;
-  }
-
-  /**
-   * Implements the specific logic to reject root directory deletion.
-   * The caller must return the result of this call, rather than
-   * attempt to continue with the delete operation: deleting root
-   * directories is never allowed.
-   * @param status filesystem status
-   * @param recursive recursive flag from command
-   * @return a return code for the operation
-   */
-  private boolean rejectRootDirectoryDelete(S3AFileStatus status,
-      boolean recursive) {
-    LOG.error("S3A: Cannot delete the {} root directory. Path: {}. Recursive: "
-            + "{}", bucket, status.getPath(), recursive);
-    return false;
-  }
-
-  /**
    * Create a fake directory if required.
    * That is: it is not the root path and the path does not exist.
    * Retry policy: retrying; untranslated.
@@ -2394,8 +2366,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
       DirListingMetadata dirMeta =
           S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider);
-      boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
-          allowAuthoritativeMetadataStore, allowAuthoritativePaths);
+      boolean allowAuthoritative = allowAuthoritative(f);
       if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
         return S3Guard.dirMetaToStatuses(dirMeta);
       }
@@ -2424,6 +2395,19 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   /**
+   * Is a path to be considered as authoritative?
+   * True iff there is an authoritative metastore or if there
+   * is a non-auth store with the supplied path under
+   * one of the paths declared as authoritative.
+   * @param path path
+   * @return true if the path is auth
+   */
+  protected boolean allowAuthoritative(final Path path) {
+    return S3Guard.allowAuthoritative(path, this,
+        allowAuthoritativeMetadataStore, allowAuthoritativePaths);
+  }
+
+  /**
    * Create a {@code ListObjectsRequest} request against this bucket,
    * with the maximum keys returned in a query set by {@link #maxKeys}.
    * @param key key for request
@@ -2628,8 +2612,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       // dest is also a directory, there's no difference.
       // TODO After HADOOP-16085 the modification detection can be done with
       //  etags or object version instead of modTime
-      boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
-          allowAuthoritativeMetadataStore, allowAuthoritativePaths);
+      boolean allowAuthoritative = allowAuthoritative(f);
       if (!pm.getFileStatus().isDirectory() &&
           !allowAuthoritative) {
         LOG.debug("Metadata for {} found in the non-auth metastore.", path);
@@ -3399,7 +3382,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       path = path.getParent();
     }
     try {
-      removeKeys(keysToRemove, true);
+      removeKeys(keysToRemove, true, null);
     } catch(AmazonClientException | IOException e) {
       instrumentation.errorIgnored();
       if (LOG.isDebugEnabled()) {
@@ -3728,8 +3711,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * These are then translated into {@link LocatedFileStatus} instances.
    *
    * This is essentially a nested and wrapped set of iterators, with some
-   * generator classes; an architecture which may become less convoluted
-   * using lambda-expressions.
+   * generator classes.
    * @param f a path
    * @param recursive if the subdirectories need to be traversed recursively
    *
@@ -3739,11 +3721,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * @throws IOException if any I/O error occurred
    */
   @Override
-  @Retries.OnceTranslated
+  @Retries.RetryTranslated
   public RemoteIterator<LocatedFileStatus> listFiles(Path f,
       boolean recursive) throws FileNotFoundException, IOException {
     return toLocatedFileStatusIterator(innerListFiles(f, recursive,
-        new Listing.AcceptFilesOnly(qualify(f))));
+        new Listing.AcceptFilesOnly(qualify(f)), null, true));
   }
 
   private static RemoteIterator<LocatedFileStatus> toLocatedFileStatusIterator(
@@ -3770,19 +3752,55 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   @Retries.RetryTranslated
   public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
       Path f, boolean recursive) throws IOException {
-    return invoker.retry("list", f.toString(), true,
-        () -> innerListFiles(f, recursive, new Listing.AcceptAllButS3nDirs()));
+    return innerListFiles(f, recursive, Listing.ACCEPT_ALL_BUT_S3N, null, true);
   }
 
-  @Retries.OnceTranslated
-  private RemoteIterator<S3ALocatedFileStatus> innerListFiles(Path f, boolean
-      recursive, Listing.FileStatusAcceptor acceptor) throws IOException {
+  /**
+   * List files under the path.
+   * <ol>
+   *   <li>
+   *     If the path is authoritative on the client,
+   *     only S3Guard will be queried.
+   *   </li>
+   *   <li>
+   *     Otherwise, the S3Guard values are returned first, then the S3
+   *     entries will be retrieved and returned if not already listed.</li>
+   *   <li>
+   *     when collectTombstones} is true, S3Guard tombstones will
+   *     be used to filter out deleted files.
+   *     They MUST be used for normal listings; it is only for
+   *     deletion and low-level operations that they MAY be bypassed.
+   *   </li>
+   *   <li>
+   *     The optional {@code status} parameter will be used to skip the
+   *     initial getFileStatus call.
+   *   </li>
+   * </ol>
+   *
+   * @param f path
+   * @param recursive recursive listing?
+   * @param acceptor file status filter
+   * @param status optional status of path to list.
+   * @param collectTombstones should tombstones be collected from S3Guard?
+   * @return an iterator over the listing.
+   * @throws IOException failure
+   */
+  @Retries.RetryTranslated
+  private RemoteIterator<S3ALocatedFileStatus> innerListFiles(
+      final Path f,
+      final boolean recursive,
+      final Listing.FileStatusAcceptor acceptor,
+      final S3AFileStatus status,
+      final boolean collectTombstones) throws IOException {
     entryPoint(INVOCATION_LIST_FILES);
     Path path = qualify(f);
     LOG.debug("listFiles({}, {})", path, recursive);
     try {
-      // lookup dir triggers existence check
-      final S3AFileStatus fileStatus = (S3AFileStatus) getFileStatus(path);
+      // if a status was given, that is used, otherwise
+      // call getFileStatus, which triggers an existence check
+      final S3AFileStatus fileStatus = status != null
+          ? status
+          : (S3AFileStatus) getFileStatus(path);
       if (fileStatus.isFile()) {
         // simple case: File
         LOG.debug("Path is a file");
@@ -3796,8 +3814,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
             key, delimiter);
         final RemoteIterator<S3AFileStatus> cachedFilesIterator;
         final Set<Path> tombstones;
-        boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
-            allowAuthoritativeMetadataStore, allowAuthoritativePaths);
+        boolean allowAuthoritative = allowAuthoritative(f);
         if (recursive) {
           final PathMetadata pm = metadataStore.get(path, true);
           // shouldn't need to check pm.isDeleted() because that will have
@@ -3829,7 +3846,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
                     ACCEPT_ALL,
                     acceptor,
                     cachedFilesIterator)),
-            tombstones);
+            collectTombstones ? tombstones : null);
       }
     } catch (AmazonClientException e) {
       // TODO S3Guard: retry on file not found exception
@@ -3890,8 +3907,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
               final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
                   listing.createProvidedFileStatusIterator(
                       S3Guard.dirMetaToStatuses(meta), filter, acceptor);
-              boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
-                  allowAuthoritativeMetadataStore, allowAuthoritativePaths);
+              boolean allowAuthoritative = allowAuthoritative(f);
               return (allowAuthoritative && meta != null
                   && meta.isAuthoritative())
                   ? listing.createLocatedFileStatusIterator(
@@ -3918,7 +3934,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     return new S3ALocatedFileStatus(status,
         status.isFile() ?
           getFileBlockLocations(status, 0, status.getLen())
-          : null, status.getETag(), status.getVersionId());
+          : null);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java
index f78e11c..ac85089 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java
@@ -33,11 +33,13 @@ public class S3ALocatedFileStatus extends LocatedFileStatus {
   private final String eTag;
   private final String versionId;
 
-  public S3ALocatedFileStatus(S3AFileStatus status, BlockLocation[] locations,
-      String eTag, String versionId) {
+  private final Tristate isEmptyDirectory;
+
+  public S3ALocatedFileStatus(S3AFileStatus status, BlockLocation[] locations) {
     super(checkNotNull(status), locations);
-    this.eTag = eTag;
-    this.versionId = versionId;
+    this.eTag = status.getETag();
+    this.versionId = status.getVersionId();
+    isEmptyDirectory = status.isEmptyDirectory();
   }
 
   public String getETag() {
@@ -67,12 +69,28 @@ public class S3ALocatedFileStatus extends LocatedFileStatus {
    */
   public S3AFileStatus toS3AFileStatus() {
     return new S3AFileStatus(
+        getPath(),
+        isDirectory(),
+        isEmptyDirectory,
         getLen(),
         getModificationTime(),
-        getPath(),
         getBlockSize(),
         getOwner(),
         getETag(),
         getVersionId());
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        super.toString());
+    sb.append("[eTag='").
+        append(eTag != null ? eTag : "")
+        .append('\'');
+    sb.append(", versionId='")
+        .append(versionId != null ? versionId: "")
+        .append('\'');
+    sb.append(']');
+    return sb.toString();
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 5b6dae7..570f037 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -482,14 +482,16 @@ public class WriteOperationHelper {
    * Relies on retry code in filesystem
    * @throws IOException on problems
    * @param destKey destination key
+   * @param operationState operational state for a bulk update
    */
   @Retries.OnceTranslated
-  public void revertCommit(String destKey) throws IOException {
+  public void revertCommit(String destKey,
+      @Nullable BulkOperationState operationState) throws IOException {
     once("revert commit", destKey,
         () -> {
           Path destPath = owner.keyToQualifiedPath(destKey);
           owner.deleteObjectAtPath(destPath,
-              destKey, true);
+              destKey, true, operationState);
           owner.maybeCreateFakeParentDirectory(destPath);
         }
     );
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
index ea4eb59..75af919 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
@@ -417,12 +417,14 @@ public class CommitOperations {
   /**
    * Revert a pending commit by deleting the destination.
    * @param commit pending commit
+   * @param operationState nullable operational state for a bulk update
    * @throws IOException failure
    */
-  public void revertCommit(SinglePendingCommit commit) throws IOException {
+  public void revertCommit(SinglePendingCommit commit,
+      BulkOperationState operationState) throws IOException {
     LOG.info("Revert {}", commit);
     try {
-      writeOperations.revertCommit(commit.getDestinationKey());
+      writeOperations.revertCommit(commit.getDestinationKey(), operationState);
     } finally {
       statistics.commitReverted();
     }
@@ -620,7 +622,7 @@ public class CommitOperations {
      */
     public void revertCommit(final SinglePendingCommit commit)
         throws IOException {
-      CommitOperations.this.revertCommit(commit);
+      CommitOperations.this.revertCommit(commit, operationState);
     }
 
     /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java
new file mode 100644
index 0000000..76b1e73
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.fs.s3a.Tristate;
+import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DurationInfo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
+
+/**
+ * Implementation of the delete() operation.
+ * <p>
+ * How S3Guard/Store inconsistency is handled:
+ * <ol>
+ *   <li>
+ *     The list operation does not ask for tombstone markers; objects
+ *     under tombstones will be found and deleted.
+ *     The {@code extraFilesDeleted} counter will be incremented here.
+ *   </li>
+ *   <li>
+ *     That may result in recently deleted files being found and
+ *     duplicate delete requests issued. This is mostly harmless.
+ *   </li>
+ *   <li>
+ *     If a path is considered authoritative on the client, so only S3Guard
+ *     is used for listings, we wrap up the delete with a scan of raw S3.
+ *     This will find and eliminate OOB additions.
+ *   </li>
+ *   <li>
+ *     Exception 1: simple directory markers of the form PATH + "/".
+ *     These are treated as a signal that there are no children; no
+ *     listing is made.
+ *   </li>
+ *   <li>
+ *     Exception 2: delete(path, true) where path has a tombstone in S3Guard.
+ *     Here the delete is downgraded to a no-op even before this operation
+ *     is created. Thus: no listings of S3.
+ *   </li>
+ * </ol>
+ * If this class is logged at debug, requests will be audited:
+ * the response to a bulk delete call will be reviewed to see if there
+ * were fewer files deleted than requested; that will be printed
+ * at WARN level. This is independent of handling rejected delete
+ * requests which raise exceptions -those are processed lower down.
+ * <p>
+ * Performance tuning:
+ * <p>
+ * The operation to POST a delete request (or issue many individual
+ * DELETE calls) then update the S3Guard table is done in an async
+ * operation so that it can overlap with the LIST calls for data.
+ * However, only one single operation is queued at a time.
+ * <p>
+ * Executing more than one batch delete is possible, it just
+ * adds complexity in terms of error handling as well as in
+ * the datastructures used to track outstanding operations.
+ * If this is done, then it may be good to experiment with different
+ * page sizes. The default value is
+ * {@link InternalConstants#MAX_ENTRIES_TO_DELETE}, the maximum a single
+ * POST permits.
+ * <p>
+ * 1. Smaller pages executed in parallel may have different
+ * performance characteristics when deleting very large directories,
+ * because it will be the DynamoDB calls which will come to dominate.
+ * Any exploration of options here MUST be done with performance
+ * measurements taken from test runs in EC2 against local DDB and S3 stores,
+ * so as to ensure network latencies do not skew the results.
+ * <p>
+ * 2. Note that as the DDB thread/connection pools will be shared across
+ * all active delete operations, speedups will be minimal unless
+ * those pools are large enough to cope the extra load.
+ * <p>
+ * There are also some opportunities to explore in
+ * {@code DynamoDBMetadataStore} with batching delete requests
+ * in the DDB APIs.
+ */
+public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DeleteOperation.class);
+
+  /**
+   * Pre-fetched source status.
+   */
+  private final S3AFileStatus status;
+
+  /**
+   * Recursive delete?
+   */
+  private final boolean recursive;
+
+  /**
+   * Callback provider.
+   */
+  private final OperationCallbacks callbacks;
+
+  /**
+   * Number of entries in a page.
+   */
+  private final int pageSize;
+
+  /**
+   * Metastore -never null but may be the NullMetadataStore.
+   */
+  private final MetadataStore metadataStore;
+
+  /**
+   * Executor for async operations.
+   */
+  private final ListeningExecutorService executor;
+
+  /**
+   * List of keys built up for the next delete batch.
+   */
+  private List<DeleteObjectsRequest.KeyVersion> keys;
+
+  /**
+   * List of paths built up for deletion.
+   */
+  private List<Path> paths;
+
+  /**
+   * The single async delete operation, or null.
+   */
+  private CompletableFuture<Void> deleteFuture;
+
+  /**
+   * Bulk Operation state if this is a bulk operation.
+   */
+  private BulkOperationState operationState;
+
+  /**
+   * Counter of deleted files.
+   */
+  private long filesDeleted;
+
+  /**
+   * Counter of files found in the S3 Store during a raw scan of the store
+   * after the previous listing was in auth-mode.
+   */
+  private long extraFilesDeleted;
+
+  /**
+   * Constructor.
+   * @param context store context
+   * @param status  pre-fetched source status
+   * @param recursive recursive delete?
+   * @param callbacks callback provider
+   * @param pageSize number of entries in a page
+   */
+  public DeleteOperation(final StoreContext context,
+      final S3AFileStatus status,
+      final boolean recursive,
+      final OperationCallbacks callbacks,
+      final int pageSize) {
+
+    super(context);
+    this.status = status;
+    this.recursive = recursive;
+    this.callbacks = callbacks;
+    checkArgument(pageSize > 0
+            && pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
+        "page size out of range: %d", pageSize);
+    this.pageSize = pageSize;
+    metadataStore = context.getMetadataStore();
+    executor = context.createThrottledExecutor(1);
+  }
+
+  public long getFilesDeleted() {
+    return filesDeleted;
+  }
+
+  public long getExtraFilesDeleted() {
+    return extraFilesDeleted;
+  }
+
+  /**
+   * Delete a file or directory tree.
+   * <p>
+   * This call does not create any fake parent directory; that is
+   * left to the caller.
+   * The actual delete call is done in a separate thread.
+   * Only one delete at a time is submitted, however, to reduce the
+   * complexity of recovering from failures.
+   * <p>
+   * The DynamoDB store deletes paths in parallel itself, so that
+   * potentially slow part of the process is somewhat speeded up.
+   * The extra parallelization here is to list files from the store/DDB while
+   * that delete operation is in progress.
+   *
+   * @return true, except in the corner cases of root directory deletion
+   * @throws PathIsNotEmptyDirectoryException if the path is a dir and this
+   * is not a recursive delete.
+   * @throws IOException list failures or an inability to delete a file.
+   */
+  @Retries.RetryTranslated
+  public Boolean execute() throws IOException {
+    executeOnlyOnce();
+
+    StoreContext context = getStoreContext();
+    Path path = status.getPath();
+    LOG.debug("Delete path {} - recursive {}", path, recursive);
+    LOG.debug("Type = {}",
+        status.isFile() ? "File"
+            : (status.isEmptyDirectory() == Tristate.TRUE
+                ? "Empty Directory"
+                : "Directory"));
+
+    String key = context.pathToKey(path);
+    if (status.isDirectory()) {
+      LOG.debug("delete: Path is a directory: {}", path);
+      checkArgument(
+          status.isEmptyDirectory() != Tristate.UNKNOWN,
+          "File status must have directory emptiness computed");
+
+      if (!key.endsWith("/")) {
+        key = key + "/";
+      }
+
+      if ("/".equals(key)) {
+        LOG.error("S3A: Cannot delete the root directory."
+                + " Path: {}. Recursive: {}",
+            status.getPath(), recursive);
+        return false;
+      }
+
+      if (!recursive && status.isEmptyDirectory() == Tristate.FALSE) {
+        throw new PathIsNotEmptyDirectoryException(path.toString());
+      }
+      if (status.isEmptyDirectory() == Tristate.TRUE) {
+        LOG.debug("deleting empty directory {}", path);
+        deleteObjectAtPath(path, key, false);
+      } else {
+        deleteDirectoryTree(path, key);
+      }
+
+    } else {
+      // simple file.
+      LOG.debug("deleting simple file {}", path);
+      deleteObjectAtPath(path, key, true);
+    }
+    LOG.debug("Deleted {} files", filesDeleted);
+    return true;
+  }
+
+  /**
+   * Delete a directory tree.
+   * <p>
+   * This is done by asking the filesystem for a list of all objects under
+   * the directory path, without using any S3Guard tombstone markers to hide
+   * objects which may be returned in S3 listings but which are considered
+   * deleted.
+   * <p>
+   * Once the first {@link #pageSize} worth of objects has been listed, a batch
+   * delete is queued for execution in a separate thread; subsequent batches
+   * block waiting for the first call to complete or fail before again,
+   * being deleted in the separate thread.
+   * <p>
+   * After all listed objects are queued for deletion,
+   * if the path is considered authoritative in the client, a final scan
+   * of S3 <i>without S3Guard</i> is executed, so as to find and delete
+   * any out-of-band objects in the tree.
+   * @param path directory path
+   * @param dirKey directory key
+   * @throws IOException failure
+   */
+  @Retries.RetryTranslated
+  protected void deleteDirectoryTree(final Path path,
+      final String dirKey) throws IOException {
+    // create an operation state so that the store can manage the bulk
+    // operation if it needs to
+    operationState = S3Guard.initiateBulkWrite(
+        metadataStore,
+        BulkOperationState.OperationType.Delete,
+        path);
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, false, "deleting %s", dirKey)) {
+
+      // init the lists of keys and paths to delete
+      resetDeleteList();
+      deleteFuture = null;
+
+      // list files including any under tombstones through S3Guard
+      LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
+      final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
+          callbacks.listFilesAndEmptyDirectories(path, status, false, true);
+
+      // iterate through and delete. The next() call will block when a new S3
+      // page is required; this any active delete submitted to the executor
+      // will run in parallel with this.
+      while (locatedFiles.hasNext()) {
+        // get the next entry in the listing.
+        S3AFileStatus child = locatedFiles.next().toS3AFileStatus();
+        queueForDeletion(child);
+      }
+      LOG.debug("Deleting final batch of listed files");
+      submitNextBatch();
+      maybeAwaitCompletion(deleteFuture);
+
+      // if s3guard is authoritative we follow up with a bulk list and
+      // delete process on S3 this helps recover from any situation where S3
+      // and S3Guard have become inconsistent.
+      // This is only needed for auth paths; by performing the previous listing
+      // without tombstone filtering, any files returned by the non-auth
+      // S3 list which were hidden under tombstones will have been found
+      // and deleted.
+
+      if (callbacks.allowAuthoritative(path)) {
+        LOG.debug("Path is authoritatively guarded;"
+            + " listing files on S3 for completeness");
+        // let the ongoing delete finish to avoid duplicates
+        final RemoteIterator<S3AFileStatus> objects =
+            callbacks.listObjects(path, dirKey);
+
+        // iterate through and delete. The next() call will block when a new S3
+        // page is required; this any active delete submitted to the executor
+        // will run in parallel with this.
+        while (objects.hasNext()) {
+          // get the next entry in the listing.
+          extraFilesDeleted++;
+          queueForDeletion(deletionKey(objects.next()), null);
+        }
+        if (extraFilesDeleted > 0) {
+          LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
+              extraFilesDeleted);
+          // there is no more data:
+          // await any ongoing operation
+          submitNextBatch();
+          maybeAwaitCompletion(deleteFuture);
+        }
+      }
+
+      // final cleanup of the directory tree in the metastore, including the
+      // directory entry itself.
+      try (DurationInfo ignored2 =
+               new DurationInfo(LOG, false, "Delete metastore")) {
+        metadataStore.deleteSubtree(path, operationState);
+      }
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, operationState);
+    }
+    LOG.debug("Delete \"{}\" completed; deleted {} objects", path,
+        filesDeleted);
+  }
+
+  /**
+   * Build an S3 key for a delete request,
+   * possibly adding a "/" if it represents directory and it does
+   * not have a trailing slash already.
+   * @param stat status to build the key from
+   * @return a key for a delete request
+   */
+  private String deletionKey(final S3AFileStatus stat) {
+    return getStoreContext().fullKey(stat);
+  }
+
+  /**
+   * Queue for deletion.
+   * @param stat status to queue
+   * @throws IOException failure of the previous batch of deletions.
+   */
+  private void queueForDeletion(
+      final S3AFileStatus stat) throws IOException {
+    queueForDeletion(deletionKey(stat), stat.getPath());
+  }
+
+  /**
+   * Queue keys for deletion.
+   * Once a page of keys are ready to delete this
+   * call is submitted to the executor, after waiting for the previous run to
+   * complete.
+   *
+   * @param key key to delete
+   * @param deletePath nullable path of the key
+   * @throws IOException failure of the previous batch of deletions.
+   */
+  private void queueForDeletion(final String key,
+      @Nullable final Path deletePath) throws IOException {
+    LOG.debug("Adding object to delete: \"{}\"", key);
+    keys.add(new DeleteObjectsRequest.KeyVersion(key));
+    if (deletePath != null) {
+      paths.add(deletePath);
+    }
+
+    if (keys.size() == pageSize) {
+      submitNextBatch();
+    }
+  }
+
+  /**
+   * Wait for the previous batch to finish then submit this page.
+   * The lists of keys and pages are reset here.
+   *
+   * @throws IOException failure of the previous batch of deletions.
+   */
+  private void submitNextBatch()
+      throws IOException {
+    // delete a single page of keys and the metadata.
+    // block for any previous batch.
+    maybeAwaitCompletion(deleteFuture);
+
+    // delete the current page of keys and paths
+    deleteFuture = submitDelete(keys, paths);
+    // reset the references so a new list can be built up.
+    resetDeleteList();
+  }
+
+  /**
+   * Reset the lists of keys and paths so that a new batch of
+   * entries can built up.
+   */
+  private void resetDeleteList() {
+    keys = new ArrayList<>(pageSize);
+    paths = new ArrayList<>(pageSize);
+  }
+
+  /**
+   * Delete a file or directory marker.
+   * @param path path
+   * @param key key
+   * @param isFile is this a file?
+   * @throws IOException failure
+   */
+  @Retries.RetryTranslated
+  private void deleteObjectAtPath(
+      final Path path,
+      final String key,
+      final boolean isFile)
+      throws IOException {
+    LOG.debug("delete: {} {}", (isFile ? "file" : "dir marker"), key);
+    filesDeleted++;
+    callbacks.deleteObjectAtPath(path, key, isFile, operationState);
+  }
+
+  /**
+   * Delete a single page of keys and optionally the metadata.
+   * For a large page, it is the metadata size which dominates.
+   * Its possible to invoke this with empty lists of keys or paths.
+   * If both lists are empty no work is submitted and null is returned.
+   *
+   * @param keyList keys to delete.
+   * @param pathList paths to update the metastore with.
+   * @return the submitted future or null
+   */
+  private CompletableFuture<Void> submitDelete(
+      final List<DeleteObjectsRequest.KeyVersion> keyList,
+      final List<Path> pathList) {
+
+    if (keyList.isEmpty() && pathList.isEmpty()) {
+      return null;
+    }
+    filesDeleted += keyList.size();
+    return submit(executor, () -> {
+      asyncDeleteAction(operationState,
+          keyList,
+          pathList,
+          LOG.isDebugEnabled());
+      return null;
+    });
+  }
+
+  /**
+   * The action called in the asynchronous thread to delete
+   * the keys from S3 and paths from S3Guard.
+   *
+   * @param state ongoing operation state
+   * @param keyList keys to delete.
+   * @param pathList paths to update the metastore with.
+   * @param auditDeletedKeys should the results be audited and undeleted
+   * entries logged?
+   * @throws IOException failure
+   */
+  @Retries.RetryTranslated
+  private void asyncDeleteAction(
+      final BulkOperationState state,
+      final List<DeleteObjectsRequest.KeyVersion> keyList,
+      final List<Path> pathList,
+      final boolean auditDeletedKeys)
+      throws IOException {
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, false, "Delete page of keys")) {
+      DeleteObjectsResult result = null;
+      List<Path> undeletedObjects = new ArrayList<>();
+      if (!keyList.isEmpty()) {
+        result = Invoker.once("Remove S3 Keys",
+            status.getPath().toString(),
+            () -> callbacks.removeKeys(
+                keyList,
+                false,
+                undeletedObjects,
+                state,
+                !auditDeletedKeys));
+      }
+      if (!pathList.isEmpty()) {
+        metadataStore.deletePaths(pathList, state);
+      }
+      if (auditDeletedKeys && result != null) {
+        // audit the deleted keys
+        List<DeleteObjectsResult.DeletedObject> deletedObjects =
+            result.getDeletedObjects();
+        if (deletedObjects.size() != keyList.size()) {
+          // size mismatch
+          LOG.warn("Size mismatch in deletion operation. "
+                  + "Expected count of deleted files: {}; "
+                  + "actual: {}",
+              keyList.size(), deletedObjects.size());
+          // strip out the deleted keys
+          for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
+            keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
+          }
+          for (DeleteObjectsRequest.KeyVersion kv : keyList) {
+            LOG.debug("{}", kv.getKey());
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Block awaiting completion for any non-null future passed in;
+   * No-op if a null arg was supplied.
+   * @param future future
+   * @throws IOException if one of the called futures raised an IOE.
+   * @throws RuntimeException if one of the futures raised one.
+   */
+  private void maybeAwaitCompletion(
+      @Nullable final CompletableFuture<Void> future)
+      throws IOException {
+    if (future != null) {
+      try (DurationInfo ignored =
+               new DurationInfo(LOG, false, "delete completion")) {
+        waitForCompletion(future);
+      }
+    }
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ExecutingStoreOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ExecutingStoreOperation.java
new file mode 100644
index 0000000..ac6f6bf
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ExecutingStoreOperation.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A subclass of {@link AbstractStoreOperation} which
+ * provides a method {@link #execute()} that may be invoked
+ * exactly once.
+ * @param <T> return type of executed operation.
+ */
+public abstract class ExecutingStoreOperation<T>
+    extends AbstractStoreOperation {
+
+  /**
+   * Used to stop any re-entrancy of the rename.
+   * This is an execute-once operation.
+   */
+  private final AtomicBoolean executed = new AtomicBoolean(false);
+
+  /**
+   * constructor.
+   * @param storeContext store context.
+   */
+  protected ExecutingStoreOperation(final StoreContext storeContext) {
+    super(storeContext);
+  }
+
+  /**
+   * Execute the operation.
+   * Subclasses MUST call {@link #executeOnlyOnce()} so as to force
+   * the (atomic) re-entrancy check.
+   * @return the result.
+   * @throws IOException IO problem
+   */
+  public abstract T execute() throws IOException ;
+
+  /**
+   * Check that the operation has not been invoked twice.
+   * This is an atomic check.
+   * @throws IllegalStateException on a second invocation.
+   */
+  protected void executeOnlyOnce() {
+    Preconditions.checkState(
+        !executed.getAndSet(true),
+        "Operation attempted twice");
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index 1b2a430..efef5cf 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -18,8 +18,20 @@
 
 package org.apache.hadoop.fs.s3a.impl;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.Constants;
+
 /**
  * Internal constants private only to the S3A codebase.
+ * Please don't refer to these outside of this module &amp; its tests.
+ * If you find you need to then either the code is doing something it
+ * should not, or these constants need to be uprated to being
+ * public and stable entries.
  */
 public final class InternalConstants {
 
@@ -57,4 +69,18 @@ public final class InternalConstants {
    * Default blocksize as used in blocksize and FS status queries: {@value}.
    */
   public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
+
+
+  /**
+   * The known keys used in a standard openFile call.
+   * if there's a select marker in there then the keyset
+   * used becomes that of the select operation.
+   */
+  @InterfaceStability.Unstable
+  public static final Set<String> STANDARD_OPENFILE_KEYS =
+      Collections.unmodifiableSet(
+          new HashSet<>(
+              Arrays.asList(Constants.INPUT_FADVISE,
+                  Constants.READAHEAD_RANGE)));
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteSupport.java
index 9943715..e79eeb8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteSupport.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteSupport.java
@@ -37,6 +37,7 @@ import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.AWSS3IOException;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -49,12 +50,17 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
   private static final Logger LOG = LoggerFactory.getLogger(
       MultiObjectDeleteSupport.class);
 
+  private final BulkOperationState operationState;
+
   /**
    * Initiate with a store context.
    * @param context store context.
+   * @param operationState any ongoing bulk operation.
    */
-  public MultiObjectDeleteSupport(final StoreContext context) {
+  public MultiObjectDeleteSupport(final StoreContext context,
+      final BulkOperationState operationState) {
     super(context);
+    this.operationState = operationState;
   }
 
   /**
@@ -178,7 +184,7 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
     //  metastore entries
     deleted.forEach(path -> {
       try {
-        metadataStore.delete(path);
+        metadataStore.delete(path, operationState);
       } catch (IOException e) {
         // trouble: we failed to delete the far end entry
         // try with the next one.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
new file mode 100644
index 0000000..0fcf645
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.MultiObjectDeleteException;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
+
+/**
+ * These are all the callbacks which the {@link RenameOperation}
+ * and {@link DeleteOperation } operations need,
+ * derived from the appropriate S3AFileSystem methods.
+ */
+public interface OperationCallbacks {
+
+  /**
+   * Create the attributes of an object for subsequent use.
+   * @param path path path of the request.
+   * @param eTag the eTag of the S3 object
+   * @param versionId S3 object version ID
+   * @param len length of the file
+   * @return attributes to use when building the query.
+   */
+  S3ObjectAttributes createObjectAttributes(
+      Path path,
+      String eTag,
+      String versionId,
+      long len);
+
+  /**
+   * Create the attributes of an object for subsequent use.
+   * @param fileStatus file status to build from.
+   * @return attributes to use when building the query.
+   */
+  S3ObjectAttributes createObjectAttributes(
+      S3AFileStatus fileStatus);
+
+  /**
+   * Create the read context for reading from the referenced file,
+   * using FS state as well as the status.
+   * @param fileStatus file status.
+   * @return a context for read and select operations.
+   */
+  S3AReadOpContext createReadContext(
+      FileStatus fileStatus);
+
+  /**
+   * The rename has finished; perform any store cleanup operations
+   * such as creating/deleting directory markers.
+   * @param sourceRenamed renamed source
+   * @param destCreated destination file created.
+   * @throws IOException failure
+   */
+  void finishRename(Path sourceRenamed, Path destCreated) throws IOException;
+
+  /**
+   * Delete an object, also updating the metastore.
+   * This call does <i>not</i> create any mock parent entries.
+   * Retry policy: retry untranslated; delete considered idempotent.
+   * @param path path to delete
+   * @param key key of entry
+   * @param isFile is the path a file (used for instrumentation only)
+   * @param operationState (nullable) operational state for a bulk update
+   * @throws AmazonClientException problems working with S3
+   * @throws IOException IO failure in the metastore
+   */
+  @Retries.RetryTranslated
+  void deleteObjectAtPath(Path path,
+      String key,
+      boolean isFile,
+      BulkOperationState operationState)
+      throws IOException;
+
+  /**
+   * Recursive list of files and empty directories.
+   *
+   * @param path path to list from
+   * @param status optional status of path to list.
+   * @param collectTombstones should tombstones be collected from S3Guard?
+   * @param includeSelf should the listing include this path if present?
+   * @return an iterator.
+   * @throws IOException failure
+   */
+  @Retries.RetryTranslated
+  RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
+      Path path,
+      S3AFileStatus status,
+      boolean collectTombstones,
+      boolean includeSelf) throws IOException;
+
+  /**
+   * Copy a single object in the bucket via a COPY operation.
+   * There's no update of metadata, directory markers, etc.
+   * Callers must implement.
+   * @param srcKey source object path
+   * @param srcAttributes S3 attributes of the source object
+   * @param readContext the read context
+   * @return the result of the copy
+   * @throws InterruptedIOException the operation was interrupted
+   * @throws IOException Other IO problems
+   */
+  @Retries.RetryTranslated
+  CopyResult copyFile(String srcKey,
+      String destKey,
+      S3ObjectAttributes srcAttributes,
+      S3AReadOpContext readContext)
+      throws IOException;
+
+  /**
+   * Remove keys from the store, updating the metastore on a
+   * partial delete represented as a MultiObjectDeleteException failure by
+   * deleting all those entries successfully deleted and then rethrowing
+   * the MultiObjectDeleteException.
+   * @param keysToDelete collection of keys to delete on the s3-backend.
+   *        if empty, no request is made of the object store.
+   * @param deleteFakeDir indicates whether this is for deleting fake dirs.
+   * @param undeletedObjectsOnFailure List which will be built up of all
+   * files that were not deleted. This happens even as an exception
+   * is raised.
+   * @param operationState bulk operation state
+   * @param quiet should a bulk query be quiet, or should its result list
+   * all deleted keys
+   * @return the deletion result if a multi object delete was invoked
+   * and it returned without a failure, else null.
+   * @throws InvalidRequestException if the request was rejected due to
+   * a mistaken attempt to delete the root directory.
+   * @throws MultiObjectDeleteException one or more of the keys could not
+   * be deleted in a multiple object delete operation.
+   * @throws AmazonClientException amazon-layer failure.
+   * @throws IOException other IO Exception.
+   */
+  @Retries.RetryMixed
+  DeleteObjectsResult removeKeys(
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+      boolean deleteFakeDir,
+      List<Path> undeletedObjectsOnFailure,
+      BulkOperationState operationState,
+      boolean quiet)
+      throws MultiObjectDeleteException, AmazonClientException,
+      IOException;
+
+  /**
+   * Is the path for this instance considered authoritative on the client,
+   * that is: will listing/status operations only be handled by the metastore,
+   * with no fallback to S3.
+   * @param p path
+   * @return true iff the path is authoritative on the client.
+   */
+  boolean allowAuthoritative(Path p);
+
+  /**
+   * Create an iterator over objects in S3 only; S3Guard
+   * is not involved.
+   * The listing includes the key itself, if found.
+   * @param path  path of the listing.
+   * @param key object key
+   * @return iterator with the first listing completed.
+   * @throws IOException failure.
+   */
+  @Retries.RetryTranslated
+  RemoteIterator<S3AFileStatus> listObjects(
+      Path path,
+      String key)
+      throws IOException;
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
index 04784fb..a356107 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
@@ -19,24 +19,18 @@
 package org.apache.hadoop.fs.s3a.impl;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.MultiObjectDeleteException;
 import com.amazonaws.services.s3.transfer.model.CopyResult;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3a.RenameFailedException;
@@ -80,17 +74,11 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.RENAME_PARALLEL_LI
  * Callers are required to themselves verify that destination is not under
  * the source, above the source, the source itself, etc, etc.
  */
-public class RenameOperation extends AbstractStoreOperation {
+public class RenameOperation extends ExecutingStoreOperation<Long> {
 
   private static final Logger LOG = LoggerFactory.getLogger(
       RenameOperation.class);
 
-  /**
-   * Used to stop any re-entrancy of the rename.
-   * This is an execute-once operation.
-   */
-  private final AtomicBoolean executed = new AtomicBoolean(false);
-
   private final Path sourcePath;
 
   private final String sourceKey;
@@ -106,7 +94,7 @@ public class RenameOperation extends AbstractStoreOperation {
   /**
    * Callbacks into the filesystem.
    */
-  private final RenameOperationCallbacks callbacks;
+  private final OperationCallbacks callbacks;
 
   /**
    * Counter of bytes copied.
@@ -158,7 +146,7 @@ public class RenameOperation extends AbstractStoreOperation {
       final Path destPath,
       final String destKey,
       final S3AFileStatus destStatus,
-      final RenameOperationCallbacks callbacks) {
+      final OperationCallbacks callbacks) {
     super(storeContext);
     this.sourcePath = sourcePath;
     this.sourceKey = sourceKey;
@@ -174,7 +162,10 @@ public class RenameOperation extends AbstractStoreOperation {
   /**
    * Wait for the active copies to complete then reset the list.
    * @param reason for messages
+   * @throws IOException if one of the called futures raised an IOE.
+   * @throws RuntimeException if one of the futures raised one.
    */
+  @Retries.OnceTranslated
   private void completeActiveCopies(String reason) throws IOException {
     LOG.debug("Waiting for {} active copies to complete: {}",
         activeCopies.size(), reason);
@@ -183,7 +174,7 @@ public class RenameOperation extends AbstractStoreOperation {
   }
 
   /**
-   * Queue and object for deletion.
+   * Queue an object for deletion.
    * @param path path to the object
    * @param key key of the object.
    */
@@ -198,6 +189,7 @@ public class RenameOperation extends AbstractStoreOperation {
    * @param reason reason for logs
    * @throws IOException failure.
    */
+  @Retries.RetryTranslated
   private void completeActiveCopiesAndDeleteSources(String reason)
       throws IOException {
     completeActiveCopies(reason);
@@ -210,10 +202,8 @@ public class RenameOperation extends AbstractStoreOperation {
   }
 
   @Retries.RetryMixed
-  public long executeRename() throws IOException {
-    Preconditions.checkState(
-        !executed.getAndSet(true),
-        "Rename attempted twice");
+  public Long execute() throws IOException {
+    executeOnlyOnce();
     final StoreContext storeContext = getStoreContext();
     final MetadataStore metadataStore = checkNotNull(
         storeContext.getMetadataStore(),
@@ -294,7 +284,7 @@ public class RenameOperation extends AbstractStoreOperation {
         false);
     bytesCopied.addAndGet(sourceStatus.getLen());
     // delete the source
-    callbacks.deleteObjectAtPath(sourcePath, sourceKey, true);
+    callbacks.deleteObjectAtPath(sourcePath, sourceKey, true, null);
     // and update the tracker
     renameTracker.sourceObjectsDeleted(Lists.newArrayList(sourcePath));
   }
@@ -327,12 +317,15 @@ public class RenameOperation extends AbstractStoreOperation {
       // marker.
       LOG.debug("Deleting fake directory marker at destination {}",
           destStatus.getPath());
-      callbacks.deleteObjectAtPath(destStatus.getPath(), dstKey, false);
+      callbacks.deleteObjectAtPath(destStatus.getPath(), dstKey, false, null);
     }
 
     Path parentPath = storeContext.keyToPath(srcKey);
     final RemoteIterator<S3ALocatedFileStatus> iterator =
-        callbacks.listFilesAndEmptyDirectories(parentPath);
+        callbacks.listFilesAndEmptyDirectories(parentPath,
+            sourceStatus,
+            true,
+            true);
     while (iterator.hasNext()) {
       // get the next entry in the listing.
       S3ALocatedFileStatus child = iterator.next();
@@ -478,7 +471,7 @@ public class RenameOperation extends AbstractStoreOperation {
    * @param paths list of paths matching the keys to delete 1:1.
    * @throws IOException failure
    */
-  @Retries.RetryMixed
+  @Retries.RetryTranslated
   private void removeSourceObjects(
       final List<DeleteObjectsRequest.KeyVersion> keys,
       final List<Path> paths)
@@ -488,7 +481,12 @@ public class RenameOperation extends AbstractStoreOperation {
       // remove the keys
       // this will update the metastore on a failure, but on
       // a successful operation leaves the store as is.
-      callbacks.removeKeys(keys, false, undeletedObjects);
+      callbacks.removeKeys(
+          keys,
+          false,
+          undeletedObjects,
+          renameTracker.getOperationState(),
+          true);
       // and clear the list.
     } catch (AmazonClientException | IOException e) {
       // Failed.
@@ -496,7 +494,8 @@ public class RenameOperation extends AbstractStoreOperation {
       // removeKeys will have already purged the metastore of
       // all keys it has known to delete; this is just a final
       // bit of housekeeping and a chance to tune exception
-      // reporting
+      // reporting.
+      // The returned IOE is rethrown.
       throw renameTracker.deleteFailed(e, paths, undeletedObjects);
     }
     renameTracker.sourceObjectsDeleted(paths);
@@ -518,117 +517,4 @@ public class RenameOperation extends AbstractStoreOperation {
     }
   }
 
-  /**
-   * These are all the callbacks which the rename operation needs,
-   * derived from the appropriate S3AFileSystem methods.
-   */
-  public interface RenameOperationCallbacks {
-
-    /**
-     * Create the attributes of an object for subsequent use.
-     * @param path path path of the request.
-     * @param eTag the eTag of the S3 object
-     * @param versionId S3 object version ID
-     * @param len length of the file
-     * @return attributes to use when building the query.
-     */
-    S3ObjectAttributes createObjectAttributes(
-        Path path,
-        String eTag,
-        String versionId,
-        long len);
-
-    /**
-     * Create the attributes of an object for subsequent use.
-     * @param fileStatus file status to build from.
-     * @return attributes to use when building the query.
-     */
-    S3ObjectAttributes createObjectAttributes(
-        S3AFileStatus fileStatus);
-
-    /**
-     * Create the read context for reading from the referenced file,
-     * using FS state as well as the status.
-     * @param fileStatus file status.
-     * @return a context for read and select operations.
-     */
-    S3AReadOpContext createReadContext(
-        FileStatus fileStatus);
-
-    /**
-     * The rename has finished; perform any store cleanup operations
-     * such as creating/deleting directory markers.
-     * @param sourceRenamed renamed source
-     * @param destCreated destination file created.
-     * @throws IOException failure
-     */
-    void finishRename(Path sourceRenamed, Path destCreated) throws IOException;
-
-    /**
-     * Delete an object, also updating the metastore.
-     * This call does <i>not</i> create any mock parent entries.
-     * Retry policy: retry untranslated; delete considered idempotent.
-     * @param path path to delete
-     * @param key key of entry
-     * @param isFile is the path a file (used for instrumentation only)
-     * @throws AmazonClientException problems working with S3
-     * @throws IOException IO failure in the metastore
-     */
-    @Retries.RetryMixed
-    void deleteObjectAtPath(Path path, String key, boolean isFile)
-        throws IOException;
-
-    /**
-     * Recursive list of files and empty directories.
-     * @param path path to list from
-     * @return an iterator.
-     * @throws IOException failure
-     */
-    RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
-        Path path) throws IOException;
-
-    /**
-     * Copy a single object in the bucket via a COPY operation.
-     * There's no update of metadata, directory markers, etc.
-     * Callers must implement.
-     * @param srcKey source object path
-     * @param srcAttributes S3 attributes of the source object
-     * @param readContext the read context
-     * @return the result of the copy
-     * @throws InterruptedIOException the operation was interrupted
-     * @throws IOException Other IO problems
-     */
-    @Retries.RetryTranslated
-    CopyResult copyFile(String srcKey,
-        String destKey,
-        S3ObjectAttributes srcAttributes,
-        S3AReadOpContext readContext)
-        throws IOException;
-
-    /**
-     * Remove keys from the store, updating the metastore on a
-     * partial delete represented as a MultiObjectDeleteException failure by
-     * deleting all those entries successfully deleted and then rethrowing
-     * the MultiObjectDeleteException.
-     * @param keysToDelete collection of keys to delete on the s3-backend.
-     *        if empty, no request is made of the object store.
-     * @param deleteFakeDir indicates whether this is for deleting fake dirs.
-     * @param undeletedObjectsOnFailure List which will be built up of all
-     * files that were not deleted. This happens even as an exception
-     * is raised.
-     * @throws InvalidRequestException if the request was rejected due to
-     * a mistaken attempt to delete the root directory.
-     * @throws MultiObjectDeleteException one or more of the keys could not
-     * be deleted in a multiple object delete operation.
-     * @throws AmazonClientException amazon-layer failure.
-     * @throws IOException other IO Exception.
-     */
-    @Retries.RetryMixed
-    void removeKeys(
-        List<DeleteObjectsRequest.KeyVersion> keysToDelete,
-        boolean deleteFakeDir,
-        List<Path> undeletedObjectsOnFailure)
-        throws MultiObjectDeleteException, AmazonClientException,
-        IOException;
-  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
index 28300c2..88480db 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AInputPolicy;
 import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
@@ -301,6 +302,10 @@ public class StoreContext {
     return createThrottledExecutor(executorCapacity);
   }
 
+  /**
+   * Get the owner of the filesystem.
+   * @return the user who created this filesystem.
+   */
   public UserGroupInformation getOwner() {
     return owner;
   }
@@ -332,4 +337,18 @@ public class StoreContext {
   public ITtlTimeProvider getTimeProvider() {
     return timeProvider;
   }
+
+  /**
+   * Build the full S3 key for a request from the status entry,
+   * possibly adding a "/" if it represents directory and it does
+   * not have a trailing slash already.
+   * @param stat status to build the key from
+   * @return a key for a delete request
+   */
+  public String fullKey(final S3AFileStatus stat) {
+    String k = pathToKey(stat.getPath());
+    return (stat.isDirectory() && !k.endsWith("/"))
+        ? k + "/"
+        : k;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DelayedUpdateRenameTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DelayedUpdateRenameTracker.java
index 22c4434..51a9821 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DelayedUpdateRenameTracker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DelayedUpdateRenameTracker.java
@@ -148,7 +148,7 @@ public class DelayedUpdateRenameTracker extends RenameTracker {
       metadataStore.move(new ArrayList<>(0), destMetas, getOperationState());
       for (Path deletedPath : deletedPaths) {
         // this is not ideal in that it may leave parent stuff around.
-        metadataStore.delete(deletedPath);
+        metadataStore.delete(deletedPath, getOperationState());
       }
       deleteParentPaths();
     } catch (IOException | SdkBaseException e) {
@@ -181,7 +181,7 @@ public class DelayedUpdateRenameTracker extends RenameTracker {
       PathMetadata md = metadataStore.get(parent, true);
       if (md != null && md.isEmptyDirectory() == Tristate.TRUE) {
         // if were confident that this is empty: delete it.
-        metadataStore.delete(parent);
+        metadataStore.delete(parent, getOperationState());
       }
     }
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index aec412a..92f04bf 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3a.AWSClientIOException;
 import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
 import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
@@ -200,7 +201,7 @@ import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
  * sub-tree.
  *
  * Some mutating operations, notably
- * {@link MetadataStore#deleteSubtree(Path)} and
+ * {@link MetadataStore#deleteSubtree(Path, BulkOperationState)} and
  * {@link MetadataStore#move(Collection, Collection, BulkOperationState)}
  * are less efficient with this schema.
  * They require mutating multiple items in the DynamoDB table.
@@ -544,9 +545,12 @@ public class DynamoDBMetadataStore implements MetadataStore,
 
   @Override
   @Retries.RetryTranslated
-  public void delete(Path path)
+  public void delete(Path path,
+      final BulkOperationState operationState)
       throws IOException {
-    innerDelete(path, true, null);
+    innerDelete(path, true,
+        extractOrCreate(operationState,
+            BulkOperationState.OperationType.Delete));
   }
 
   @Override
@@ -562,7 +566,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
    * There is no check as to whether the entry exists in the table first.
    * @param path path to delete
    * @param tombstone flag to create a tombstone marker
-   * @param ancestorState ancestor state for logging
+   * @param ancestorState ancestor state for context.
    * @throws IOException I/O error.
    */
   @Retries.RetryTranslated
@@ -615,7 +619,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
 
   @Override
   @Retries.RetryTranslated
-  public void deleteSubtree(Path path)
+  public void deleteSubtree(Path path,
+      final BulkOperationState operationState)
       throws IOException {
     checkPath(path);
     LOG.debug("Deleting subtree from table {} in region {}: {}",
@@ -630,27 +635,51 @@ public class DynamoDBMetadataStore implements MetadataStore,
       LOG.debug("Subtree path {} is deleted; this will be a no-op", path);
       return;
     }
+    deleteEntries(new InternalIterators.PathFromRemoteStatusIterator(
+        new DescendantsIterator(this, meta)),
+        operationState);
+  }
 
-    try(AncestorState state = new AncestorState(this,
-        BulkOperationState.OperationType.Delete, path)) {
-      // Execute via the bounded threadpool.
-      final List<CompletableFuture<Void>> futures = new ArrayList<>();
-      for (DescendantsIterator desc = new DescendantsIterator(this, meta);
-          desc.hasNext();) {
-        final Path pathToDelete = desc.next().getPath();
-        futures.add(submit(executor, () -> {
-          innerDelete(pathToDelete, true, state);
-          return null;
-        }));
-        if (futures.size() > S3GUARD_DDB_SUBMITTED_TASK_LIMIT) {
-          // first batch done; block for completion.
-          waitForCompletion(futures);
-          futures.clear();
-        }
+  @Override
+  @Retries.RetryTranslated
+  public void deletePaths(Collection<Path> paths,
+      final BulkOperationState operationState)
+      throws IOException {
+    deleteEntries(
+        new InternalIterators.RemoteIteratorFromIterator<>(paths.iterator()),
+        operationState);
+  }
+
+  /**
+   * Delete the entries under an iterator.
+   * There's no attempt to order the paths: they are
+   * deleted in the order passed in.
+   * @param entries entries to delete.
+   * @param operationState Nullable operation state
+   * @throws IOException failure
+   */
+  @Retries.RetryTranslated
+  private void deleteEntries(RemoteIterator<Path> entries,
+      final BulkOperationState operationState)
+      throws IOException {
+    final List<CompletableFuture<Void>> futures = new ArrayList<>();
+    AncestorState state = extractOrCreate(operationState,
+        BulkOperationState.OperationType.Delete);
+
+    while (entries.hasNext()) {
+      final Path pathToDelete = entries.next();
+      futures.add(submit(executor, () -> {
+        innerDelete(pathToDelete, true, state);
+        return null;
+      }));
+      if (futures.size() > S3GUARD_DDB_SUBMITTED_TASK_LIMIT) {
+        // first batch done; block for completion.
+        waitForCompletion(futures);
+        futures.clear();
       }
-      // now wait for the final set.
-      waitForCompletion(futures);
     }
+    // now wait for the final set.
+    waitForCompletion(futures);
   }
 
   /**
@@ -2399,7 +2428,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
       // log the operations
       String stateStr = AncestorState.stateAsString(state);
       for (Item item : items) {
-        boolean tombstone = itemExists(item);
+        boolean tombstone = !itemExists(item);
         OPERATIONS_LOG.debug("{} {} {}",
             stateStr,
             tombstone ? "TOMBSTONE" : "PUT",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/InternalIterators.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/InternalIterators.java
new file mode 100644
index 0000000..dd6fb5f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/InternalIterators.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+
+/**
+ * Internal iterators.
+ */
+final class InternalIterators {
+
+  private InternalIterators() {
+  }
+
+  /**
+   * From a remote status iterator, build a path iterator.
+   */
+  static final class PathFromRemoteStatusIterator implements
+      RemoteIterator<Path> {
+
+    private final RemoteIterator<S3AFileStatus> source;
+
+    /**
+     * Construct.
+     * @param source source iterator.
+     */
+    PathFromRemoteStatusIterator(final RemoteIterator<S3AFileStatus> source) {
+      this.source = source;
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      return source.hasNext();
+    }
+
+    @Override
+    public Path next() throws IOException {
+      return source.next().getPath();
+    }
+  }
+
+  /**
+   * From a classic java.util.Iterator, build a Hadoop remote iterator.
+   * @param <T> type of iterated value.
+   */
+  static final class RemoteIteratorFromIterator<T> implements
+      RemoteIterator<T> {
+
+    private final Iterator<T> source;
+
+    /**
+     * Construct.
+     * @param source source iterator.
+     */
+    RemoteIteratorFromIterator(final Iterator<T> source) {
+      this.source = source;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return source.hasNext();
+    }
+
+    @Override
+    public T next() {
+      return source.next();
+    }
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
index 1cc4a61..37534cf 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -127,7 +127,8 @@ public class LocalMetadataStore implements MetadataStore {
   }
 
   @Override
-  public void delete(Path p)
+  public void delete(Path p,
+      final BulkOperationState operationState)
       throws IOException {
     doDelete(p, false, true);
   }
@@ -138,7 +139,8 @@ public class LocalMetadataStore implements MetadataStore {
   }
 
   @Override
-  public void deleteSubtree(Path path)
+  public void deleteSubtree(Path path,
+      final BulkOperationState operationState)
       throws IOException {
     doDelete(path, true, true);
   }
@@ -158,6 +160,14 @@ public class LocalMetadataStore implements MetadataStore {
   }
 
   @Override
+  public void deletePaths(final Collection<Path> paths,
+      @Nullable final BulkOperationState operationState) throws IOException {
+    for (Path path : paths) {
+      doDelete(path, false, true);
+    }
+  }
+
+  @Override
   public synchronized PathMetadata get(Path p) throws IOException {
     return get(p, false);
   }
@@ -232,7 +242,7 @@ public class LocalMetadataStore implements MetadataStore {
       // 1. Delete pathsToDelete
       for (Path meta : pathsToDelete) {
         LOG.debug("move: deleting metadata {}", meta);
-        delete(meta);
+        delete(meta, null);
       }
 
       // 2. Create new destination path metadata
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
index 69427ef..748b7bb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Retries;
 import org.apache.hadoop.fs.s3a.Retries.RetryTranslated;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
@@ -75,14 +76,16 @@ public interface MetadataStore extends Closeable {
    * the lastUpdated field of the record has to be updated to <pre>now</pre>.
    *
    * @param path the path to delete
+   * @param operationState (nullable) operational state for a bulk update
    * @throws IOException if there is an error
    */
-  void delete(Path path)
+  void delete(Path path,
+      @Nullable BulkOperationState operationState)
       throws IOException;
 
   /**
    * Removes the record of exactly one path.  Does not leave a tombstone (see
-   * {@link MetadataStore#delete(Path)}. It is currently
+   * {@link MetadataStore#delete(Path, BulkOperationState)}. It is currently
    * intended for testing only, and a need to use it as part of normal
    * FileSystem usage is not anticipated.
    *
@@ -105,9 +108,26 @@ public interface MetadataStore extends Closeable {
    * the lastUpdated field of all records have to be updated to <pre>now</pre>.
    *
    * @param path the root of the sub-tree to delete
+   * @param operationState (nullable) operational state for a bulk update
    * @throws IOException if there is an error
    */
-  void deleteSubtree(Path path)
+  @Retries.RetryTranslated
+  void deleteSubtree(Path path,
+      @Nullable BulkOperationState operationState)
+      throws IOException;
+
+  /**
+   * Delete the paths.
+   * There's no attempt to order the paths: they are
+   * deleted in the order passed in.
+   * @param paths paths to delete.
+   * @param operationState Nullable operation state
+   * @throws IOException failure
+   */
+
+  @RetryTranslated
+  void deletePaths(Collection<Path> paths,
+      @Nullable BulkOperationState operationState)
       throws IOException;
 
   /**
@@ -143,6 +163,7 @@ public interface MetadataStore extends Closeable {
    *     in the MetadataStore.
    * @throws IOException if there is an error
    */
+  @Retries.RetryTranslated
   DirListingMetadata listChildren(Path path) throws IOException;
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
index 8cc6370..22d9ff7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
@@ -54,7 +54,8 @@ public class NullMetadataStore implements MetadataStore {
   }
 
   @Override
-  public void delete(Path path)
+  public void delete(Path path,
+      final BulkOperationState operationState)
       throws IOException {
   }
 
@@ -63,11 +64,18 @@ public class NullMetadataStore implements MetadataStore {
   }
 
   @Override
-  public void deleteSubtree(Path path)
+  public void deleteSubtree(Path path,
+      final BulkOperationState operationState)
       throws IOException {
   }
 
   @Override
+  public void deletePaths(final Collection<Path> paths,
+      @Nullable final BulkOperationState operationState) throws IOException {
+
+  }
+
+  @Override
   public PathMetadata get(Path path) throws IOException {
     return null;
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ProgressiveRenameTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ProgressiveRenameTracker.java
index 6254f4d..c745662 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ProgressiveRenameTracker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ProgressiveRenameTracker.java
@@ -231,12 +231,13 @@ public class ProgressiveRenameTracker extends RenameTracker {
     try (DurationInfo ignored = new DurationInfo(LOG, false,
         "delete %s metastore entries", paths.size())) {
       getMetadataStore().move(paths, null, getOperationState());
+      getMetadataStore().deletePaths(paths, getOperationState());
     }
   }
 
   @Override
   public synchronized void completeRename() throws IOException {
-    // and finish off; by deleting source directories.
+    // and finish off by deleting source directories.
     sourceObjectsDeleted(pathsToDelete);
     super.completeRename();
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index 3fe458d..2e9ba66 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -760,6 +760,7 @@ public final class S3Guard {
    * @return the listing of entries under a path, or null if there as no entry.
    * @throws IOException failure.
    */
+  @Retries.RetryTranslated
   public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
       Path path, @Nullable ITtlTimeProvider timeProvider)
       throws IOException {
@@ -792,6 +793,14 @@ public final class S3Guard {
     return authoritativePaths;
   }
 
+  /**
+   * Is the path for the given FS instance authoritative?
+   * @param p path
+   * @param fs filesystem
+   * @param authMetadataStore is the MS authoritative.
+   * @param authPaths possibly empty list of authoritative paths
+   * @return true iff the path is authoritative
+   */
   public static boolean allowAuthoritative(Path p, S3AFileSystem fs,
       boolean authMetadataStore, Collection<String> authPaths) {
     String haystack = fs.maybeAddTrailingSlash(fs.qualify(p).toString());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java
index 912b73b..4f387d8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.s3a.InternalConstants;
+import org.apache.hadoop.fs.s3a.impl.InternalConstants;
 
 import static org.apache.hadoop.fs.s3a.select.SelectConstants.*;
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRootDir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRootDir.java
index dc82f05..6377b65 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRootDir.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRootDir.java
@@ -85,7 +85,8 @@ public class ITestS3AContractRootDir extends
         if (attempt < maxAttempts) {
           LOG.info("Attempt {} of {} for empty root directory test failed.  "
               + "This is likely caused by eventual consistency of S3 "
-              + "listings.  Attempting retry.", attempt, maxAttempts);
+              + "listings.  Attempting retry.", attempt, maxAttempts,
+              e);
           try {
             Thread.sleep(1000);
           } catch (InterruptedException e2) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
index 55d396e..83deb11 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
@@ -77,7 +77,7 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
 
   private void removeKeys(S3AFileSystem fileSystem, String... keys)
       throws IOException {
-    fileSystem.removeKeys(buildDeleteRequest(keys), false);
+    fileSystem.removeKeys(buildDeleteRequest(keys), false, null);
   }
 
   private List<DeleteObjectsRequest.KeyVersion> buildDeleteRequest(
@@ -126,7 +126,7 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
             });
     MultiObjectDeleteException ex = intercept(
         MultiObjectDeleteException.class,
-        () -> fs.removeKeys(keys, false));
+        () -> fs.removeKeys(keys, false, null));
 
     final List<Path> undeleted
         = extractUndeletedPaths(ex, fs::keyToQualifiedPath);
@@ -142,7 +142,7 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
     keys.add(new DeleteObjectsRequest.KeyVersion(marker));
 
     Pair<List<Path>, List<Path>> pair =
-        new MultiObjectDeleteSupport(fs.createStoreContext())
+        new MultiObjectDeleteSupport(fs.createStoreContext(), null)
         .splitUndeletedKeys(ex, keys);
     assertEquals(undeleted, pair.getLeft());
     List<Path> right = pair.getRight();
@@ -161,7 +161,7 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
     S3AFileSystem fs = getFileSystem();
     List<DeleteObjectsRequest.KeyVersion> keys = keysToDelete(
         Lists.newArrayList(new Path(base, "1"), new Path(base, "2")));
-    fs.removeKeys(keys, false);
+    fs.removeKeys(keys, false, null);
   }
 
   private String join(final Iterable iterable) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetadataPersistenceException.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetadataPersistenceException.java
index 6ef1492..7a980a3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetadataPersistenceException.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetadataPersistenceException.java
@@ -103,24 +103,30 @@ public class ITestS3AMetadataPersistenceException extends AbstractS3ATestBase {
   @Test
   public void testFailedMetadataUpdate() throws Throwable {
     // write a trivial file
-    Path testFile = path("testFile");
-    FSDataOutputStream outputStream = fs.create(testFile);
-    outputStream.write(1);
-
-    if (failOnError) {
-      // close should throw the expected exception
-      MetadataPersistenceException thrown =
-          intercept(
-              MetadataPersistenceException.class,
-              () -> { outputStream.close(); });
-      assertEquals("cause didn't match original exception",
-          ioException, thrown.getCause());
-    } else {
-      MetricDiff ignoredCount = new MetricDiff(fs, Statistic.IGNORED_ERRORS);
-
-      // close should merely log and increment the statistic
-      outputStream.close();
-      ignoredCount.assertDiffEquals("ignored errors", 1);
+    Path testFile = path("testFailedMetadataUpdate");
+    try {
+      FSDataOutputStream outputStream = fs.create(testFile);
+      outputStream.write(1);
+
+      if (failOnError) {
+        // close should throw the expected exception
+        MetadataPersistenceException thrown =
+            intercept(
+                MetadataPersistenceException.class,
+                outputStream::close);
+        assertEquals("cause didn't match original exception",
+            ioException, thrown.getCause());
+      } else {
+        MetricDiff ignoredCount = new MetricDiff(fs, Statistic.IGNORED_ERRORS);
+
+        // close should merely log and increment the statistic
+        outputStream.close();
+        ignoredCount.assertDiffEquals("ignored errors", 1);
+      }
+    } finally {
+      // turn off the store and forcibly delete from the raw bucket.
+      fs.setMetadataStore(new NullMetadataStore());
+      fs.delete(testFile, false);
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index 1503fef..6e55796 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -87,7 +87,7 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f);
     // this is a long value to guarantee that the inconsistency holds
     // even over long-haul connections, and in the debugger too/
-    conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, (long) (60 * 1000));
+    conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, 600_1000L);
     return new S3AContract(conf);
   }
 
@@ -151,13 +151,13 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
   @Test
   public void testConsistentListAfterRename() throws Exception {
     Path d1f = path("d1/f");
-    Path d1f2 = path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING);
+    Path d1f2 = path("d1/f-" + DEFAULT_DELAY_KEY_SUBSTRING);
     Path[] mkdirs = {d1f, d1f2};
     Path d1 = path("d1");
     Path[] srcdirs = {d1};
     Path d2 = path("d2");
     Path[] dstdirs = {d2};
-    Path d2f2 = path("d2/f" + DEFAULT_DELAY_KEY_SUBSTRING);
+    Path d2f2 = path("d2/f-" + DEFAULT_DELAY_KEY_SUBSTRING);
     Path[] yesdirs = {d2, path("d2/f"), d2f2};
     Path[] nodirs = {
         d1, d1f, d1f2};
@@ -218,8 +218,11 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     Path inconsistentPath =
         path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
 
-    Path[] testDirs = {path("a/b/dir1"),
-        path("a/b/dir2"),
+    Path dir1 = path("a/b/dir1");
+    Path dir2 = path("a/b/dir2");
+    Path[] testDirs = {
+        dir1,
+        dir2,
         inconsistentPath};
 
     for (Path path : testDirs) {
@@ -235,13 +238,11 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     for (FileStatus fileState : paths) {
       list.add(fileState.getPath());
     }
-
-    assertFalse("This path should be deleted.",
-        list.contains(path("a/b/dir1")));
-    assertFalse("This path should be deleted.",
-        list.contains(path("a/b/dir2")));
-    assertFalse("This should fail without S3Guard, and succeed with it.",
-        list.contains(inconsistentPath));
+    Assertions.assertThat(list)
+        .describedAs("Expected deleted files to be excluded")
+        .doesNotContain(dir1)
+        .doesNotContain(dir2)
+        .doesNotContain(inconsistentPath);
   }
 
   /**
@@ -276,11 +277,11 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     for (FileStatus fileState : paths) {
       list.add(fileState.getPath());
     }
-    assertTrue(list.contains(path("a3/b/dir1")));
-    assertFalse(list.contains(path("a3/b/dir2")));
-    // This should fail without S3Guard, and succeed with it.
-    assertFalse(list.contains(path("a3/b/dir3-" +
-        DEFAULT_DELAY_KEY_SUBSTRING)));
+    Assertions.assertThat(list)
+        .contains(path("a3/b/dir1"))
+        .doesNotContain(path("a3/b/dir2"))
+        .doesNotContain(path("a3/b/dir3-" +
+            DEFAULT_DELAY_KEY_SUBSTRING));
 
     intercept(FileNotFoundException.class, "",
         "Recently renamed dir should not be visible",
@@ -312,10 +313,10 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     for (FileStatus fileState : paths) {
       list.add(fileState.getPath());
     }
-    assertTrue(list.contains(path("a/b/dir1")));
-    assertTrue(list.contains(path("a/b/dir2")));
-    // This should fail without S3Guard, and succeed with it.
-    assertTrue(list.contains(inconsistentPath));
+    Assertions.assertThat(list)
+        .contains(path("a/b/dir1"))
+        .contains(path("a/b/dir2"))
+        .contains(inconsistentPath);
   }
 
   /**
@@ -526,30 +527,35 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
 
   @Test
   public void testInconsistentS3ClientDeletes() throws Throwable {
+    describe("Verify that delete adds tombstones which block entries"
+        + " returned in (inconsistent) listings");
     // Test only implemented for v2 S3 list API
-    Assume.assumeTrue(getConfiguration()
-        .getInt(LIST_VERSION, DEFAULT_LIST_VERSION) == 2);
+    assumeV2ListAPI();
 
     S3AFileSystem fs = getFileSystem();
-    Path root = path("testInconsistentClient" + DEFAULT_DELAY_KEY_SUBSTRING);
+    Path root = path("testInconsistentS3ClientDeletes-"
+        + DEFAULT_DELAY_KEY_SUBSTRING);
     for (int i = 0; i < 3; i++) {
-      fs.mkdirs(new Path(root, "dir" + i));
-      touch(fs, new Path(root, "file" + i));
+      fs.mkdirs(new Path(root, "dir-" + i));
+      touch(fs, new Path(root, "file-" + i));
       for (int j = 0; j < 3; j++) {
-        touch(fs, new Path(new Path(root, "dir" + i), "file" + i + "-" + j));
+        touch(fs, new Path(new Path(root, "dir-" + i), "file-" + i + "-" + j));
       }
     }
     clearInconsistency(fs);
 
     String key = fs.pathToKey(root) + "/";
 
+    LOG.info("Listing objects before executing delete()");
     ListObjectsV2Result preDeleteDelimited = listObjectsV2(fs, key, "/");
     ListObjectsV2Result preDeleteUndelimited = listObjectsV2(fs, key, null);
 
+    LOG.info("Deleting the directory {}", root);
     fs.delete(root, true);
+    LOG.info("Delete completed; listing results which must exclude deleted"
+        + " paths");
 
     ListObjectsV2Result postDeleteDelimited = listObjectsV2(fs, key, "/");
-    ListObjectsV2Result postDeleteUndelimited = listObjectsV2(fs, key, null);
     assertListSizeEqual(
         "InconsistentAmazonS3Client added back objects incorrectly " +
             "in a non-recursive listing",
@@ -561,6 +567,8 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
         preDeleteDelimited.getCommonPrefixes(),
         postDeleteDelimited.getCommonPrefixes()
     );
+    LOG.info("Executing Deep listing");
+    ListObjectsV2Result postDeleteUndelimited = listObjectsV2(fs, key, null);
     assertListSizeEqual("InconsistentAmazonS3Client added back objects incorrectly " +
             "in a recursive listing",
         preDeleteUndelimited.getObjectSummaries(),
@@ -575,6 +583,60 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
   }
 
   /**
+   * Require the v2 S3 list API.
+   */
+  protected void assumeV2ListAPI() {
+    Assume.assumeTrue(getConfiguration()
+        .getInt(LIST_VERSION, DEFAULT_LIST_VERSION) == 2);
+  }
+
+  /**
+   * Verify that delayed S3 listings doesn't stop the FS from deleting
+   * a directory tree. This has not always been the case; this test
+   * verifies the fix and prevents regression.
+   */
+  @Test
+  public void testDeleteUsesS3Guard() throws Throwable {
+    describe("Verify that delete() uses S3Guard to get a consistent"
+        + " listing of its directory structure");
+    assumeV2ListAPI();
+    S3AFileSystem fs = getFileSystem();
+    Path root = path(
+        "testDeleteUsesS3Guard-" + DEFAULT_DELAY_KEY_SUBSTRING);
+    for (int i = 0; i < 3; i++) {
+      Path path = new Path(root, "file-" + i);
+      touch(fs, path);
+    }
+    // we now expect the listing to miss these
+    String key = fs.pathToKey(root) + "/";
+
+    // verify that the inconsistent listing does not show these
+    LOG.info("Listing objects before executing delete()");
+    List<Path> preDeletePaths = objectsToPaths(listObjectsV2(fs, key, null));
+    Assertions.assertThat(preDeletePaths)
+        .isEmpty();
+    // do the delete
+    fs.delete(root, true);
+
+    // now go through every file and verify that it is not there.
+    // if you comment out the delete above and run this test case,
+    // the assertion will fail; this is how the validity of the assertions
+    // were verified.
+    clearInconsistency(fs);
+    List<Path> postDeletePaths =
+        objectsToPaths(listObjectsV2(fs, key, null));
+    Assertions.assertThat(postDeletePaths)
+        .isEmpty();
+  }
+
+  private List<Path> objectsToPaths(ListObjectsV2Result r) {
+    S3AFileSystem fs = getFileSystem();
+    return r.getObjectSummaries().stream()
+        .map(s -> fs.keyToQualifiedPath(s.getKey()))
+        .collect(Collectors.toList());
+  }
+
+  /**
    * Tests that the file's eTag and versionId are preserved in recursive
    * listings.
    */
@@ -633,7 +695,13 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
         .map(n -> n.toString())
         .collect(Collectors.joining("\n"));
     String summary = "\nExpected:" + leftContents
-        + "\n-----------\nActual:" + rightContents;
+        + "\n-----------\n"
+        + "Actual:" + rightContents
+        + "\n-----------\n";
+
+    if (expected.size() != actual.size()) {
+      LOG.error(message + summary);
+    }
     assertEquals(message + summary, expected.size(), actual.size());
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
index ae75f7b..f9e84e2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
@@ -27,7 +27,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import org.apache.hadoop.test.LambdaTestUtils;
+import org.assertj.core.api.Assertions;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,12 +49,11 @@ import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
-import static org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_MODE;
-import static org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_SOURCE;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
 import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
 import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
@@ -196,6 +195,7 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
     rawFS = createUnguardedFS();
     assertFalse("Raw FS still has S3Guard " + rawFS,
         rawFS.hasMetadataStore());
+    nameThread();
   }
 
   @Override
@@ -722,7 +722,7 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
       assertArraySize("Added one file to the new dir, so the number of "
               + "files in the dir should be one.", 1, origList);
       S3AFileStatus origGuardedFileStatus = origList[0];
-      assertNotNull("No etag in origGuardedFileStatus" + origGuardedFileStatus,
+      assertNotNull("No etag in origGuardedFileStatus " + origGuardedFileStatus,
           origGuardedFileStatus.getETag());
       final DirListingMetadata dirListingMetadata =
           realMs.listChildren(guardedFs.qualify(testDirPath));
@@ -992,6 +992,61 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
         () -> fs.getFileStatus(testFilePath));
   }
 
+  @Test
+  public void testDeleteIgnoresTombstones() throws Throwable {
+    describe("Verify that directory delete goes behind tombstones");
+    Path dir = path("oobdir");
+    Path testFilePath = new Path(dir, "file");
+    // create a file under the store
+    createAndAwaitFs(guardedFs, testFilePath, "Original File is long");
+    // Delete the file leaving a tombstone in the metastore
+    LOG.info("Initial delete of guarded FS dir {}", dir);
+    guardedFs.delete(dir, true);
+//    deleteFile(guardedFs, testFilePath);
+    awaitDeletedFileDisappearance(guardedFs, testFilePath);
+    // add a new file in raw
+    createAndAwaitFs(rawFS, testFilePath, "hi!");
+    // now we need to be sure that the file appears in a listing
+    awaitListingContainsChild(rawFS, dir, testFilePath);
+
+    // now a hack to remove the empty dir up the tree
+    Path sibling = new Path(dir, "sibling");
+    guardedFs.mkdirs(sibling);
+    // now do a delete of the parent dir. This is required to also
+    // check the underlying fs.
+    LOG.info("Deleting guarded FS dir {} with OOB child", dir);
+    guardedFs.delete(dir, true);
+    LOG.info("Now waiting for child to be deleted in raw fs: {}", testFilePath);
+
+    // so eventually the file will go away.
+    // this is required to be true in auth as well as non-auth.
+
+    awaitDeletedFileDisappearance(rawFS, testFilePath);
+  }
+
+  /**
+   * Wait for a file to be visible.
+   * @param fs filesystem
+   * @param testFilePath path to query
+   * @throws Exception failure
+   */
+  private void awaitListingContainsChild(S3AFileSystem fs,
+      final Path dir,
+      final Path testFilePath)
+      throws Exception {
+    LOG.info("Awaiting list of {} to include {}", dir, testFilePath);
+    eventually(
+        STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
+        () -> {
+          FileStatus[] stats = fs.listStatus(dir);
+          Assertions.assertThat(stats)
+              .describedAs("listing of %s", dir)
+              .filteredOn(s -> s.getPath().equals(testFilePath))
+              .isNotEmpty();
+          return null;
+        });
+  }
+
   private FSDataOutputStream createNonRecursive(FileSystem fs, Path path)
       throws Exception {
     return fs
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
index d421118..8686e54 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
@@ -316,7 +316,10 @@ public class MockS3AFileSystem extends S3AFileSystem {
   }
 
   @Override
-  void deleteObjectAtPath(Path f, String key, boolean isFile)
+  void deleteObjectAtPath(Path f,
+      String key,
+      boolean isFile,
+      final BulkOperationState operationState)
       throws AmazonClientException, IOException {
     deleteObject(key);
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 81db77c..118c9ee 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -71,6 +71,11 @@ public interface S3ATestConstants {
   String KEY_DIRECTORY_COUNT = SCALE_TEST + "directory.count";
 
   /**
+   * The file count to use in rename/delete tests: {@value}.
+   */
+  String KEY_FILE_COUNT = SCALE_TEST + "file.count";
+
+  /**
    * The readahead buffer: {@value}.
    */
   String KEY_READ_BUFFER_SIZE = S3A_SCALE_TEST + "read.buffer.size";
@@ -130,6 +135,12 @@ public interface S3ATestConstants {
   int DEFAULT_DIRECTORY_COUNT = 2;
 
   /**
+   * Default number of files to create when performing
+   * delete/rename tests.
+   */
+  int DEFAULT_FILE_COUNT = 50;
+
+  /**
    * Default policy on scale tests: {@value}.
    */
   boolean DEFAULT_SCALE_TESTS_ENABLED = false;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
index d453715..b0e2b8e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
@@ -510,7 +510,7 @@ public class ITestCommitOperations extends AbstractCommitITest {
     SinglePendingCommit commit = new SinglePendingCommit();
     commit.setDestinationKey(fs.pathToKey(destFile));
     fullThrottle();
-    actions.revertCommit(commit);
+    actions.revertCommit(commit, null);
     resetFailures();
     assertPathExists("parent of reverted commit", destFile.getParent());
   }
@@ -524,7 +524,7 @@ public class ITestCommitOperations extends AbstractCommitITest {
     SinglePendingCommit commit = new SinglePendingCommit();
     commit.setDestinationKey(fs.pathToKey(destFile));
     fullThrottle();
-    actions.revertCommit(commit);
+    actions.revertCommit(commit, null);
     assertPathExists("parent of reverted (nonexistent) commit",
         destFile.getParent());
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
index 93b48e7..8530cb8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
@@ -155,6 +155,8 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
   public static final int DEPTH = 2;
   public static final int DEPTH_SCALED = 2;
 
+  public static final String PREFIX = "file-";
+
   /**
    * A role FS; if non-null it is closed in teardown.
    */
@@ -814,7 +816,7 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
     }
     // create the file paths
     for (int i = 0; i < fileCount; i++) {
-      String name = "file-" + i;
+      String name = PREFIX + i;
       Path p = new Path(destDir, name);
       filePaths.add(p);
     }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
index efd3ca4..88e3b73 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
@@ -106,7 +106,7 @@ public class TestPartialDeleteFailures {
     MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
         rejected);
     Pair<List<Path>, List<Path>> pair =
-        new MultiObjectDeleteSupport(context)
+        new MultiObjectDeleteSupport(context, null)
           .splitUndeletedKeys(ex, keys);
     List<Path> undeleted = pair.getLeft();
     List<Path> deleted = pair.getRight();
@@ -180,7 +180,7 @@ public class TestPartialDeleteFailures {
         = new OperationTrackingStore();
     StoreContext storeContext = createMockStoreContext(true, store);
     MultiObjectDeleteSupport deleteSupport
-        = new MultiObjectDeleteSupport(storeContext);
+        = new MultiObjectDeleteSupport(storeContext, null);
     Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
         triple = deleteSupport.processDeleteFailure(ex, keyList);
     Assertions.assertThat(triple.getRight())
@@ -318,12 +318,20 @@ public class TestPartialDeleteFailures {
     }
 
     @Override
-    public void delete(final Path path) {
+    public void delete(final Path path,
+        final BulkOperationState operationState) {
       deleted.add(path);
     }
 
     @Override
-    public void deleteSubtree(final Path path) {
+    public void deletePaths(final Collection<Path> paths,
+        @Nullable final BulkOperationState operationState) throws IOException {
+      deleted.addAll(paths);
+    }
+
+    @Override
+    public void deleteSubtree(final Path path,
+        final BulkOperationState operationState) {
 
     }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
index 2bdf51e..53df60f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
@@ -337,7 +337,7 @@ public class ITestDynamoDBMetadataStoreScale
   private void retryingDelete(final Path path) {
     try {
       ddbms.getInvoker().retry("Delete ", path.toString(), true,
-          () -> ddbms.delete(path));
+          () -> ddbms.delete(path, null));
     } catch (IOException e) {
       LOG.warn("Failed to delete {}: ", path, e);
     }
@@ -432,7 +432,7 @@ public class ITestDynamoDBMetadataStoreScale
           OPERATIONS_PER_THREAD,
           expectThrottling(),
           () -> {
-            ddbms.delete(path);
+            ddbms.delete(path, null);
           });
     }
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
index c71dc4d..198a2de 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
@@ -334,7 +334,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
   public void testDelete() throws Exception {
     setUpDeleteTest();
 
-    ms.delete(strToPath("/ADirectory1/db1/file2"));
+    ms.delete(strToPath("/ADirectory1/db1/file2"), null);
 
     /* Ensure delete happened. */
     assertDirectorySize("/ADirectory1/db1", 1);
@@ -363,7 +363,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     if (!allowMissing()) {
       assertCached(p + "/ADirectory1/db1");
     }
-    ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"));
+    ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"), null);
 
     assertEmptyDirectory(p + "/ADirectory1");
     assertDeleted(p + "/ADirectory1/db1");
@@ -383,7 +383,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
   public void testDeleteRecursiveRoot() throws Exception {
     setUpDeleteTest();
 
-    ms.deleteSubtree(strToPath("/"));
+    ms.deleteSubtree(strToPath("/"), null);
     assertDeleted("/ADirectory1");
     assertDeleted("/ADirectory2");
     assertDeleted("/ADirectory2/db1");
@@ -394,10 +394,10 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
   @Test
   public void testDeleteNonExisting() throws Exception {
     // Path doesn't exist, but should silently succeed
-    ms.delete(strToPath("/bobs/your/uncle"));
+    ms.delete(strToPath("/bobs/your/uncle"), null);
 
     // Ditto.
-    ms.deleteSubtree(strToPath("/internets"));
+    ms.deleteSubtree(strToPath("/internets"), null);
   }
 
 
@@ -435,7 +435,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     }
 
     if (!(ms instanceof NullMetadataStore)) {
-      ms.delete(strToPath(filePath));
+      ms.delete(strToPath(filePath), null);
       meta = ms.get(strToPath(filePath));
       assertTrue("Tombstone not left for deleted file", meta.isDeleted());
     }
@@ -664,11 +664,11 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
 
     // Make sure delete is correct as well
     if (!allowMissing()) {
-      ms.delete(new Path(p2));
+      ms.delete(new Path(p2), null);
       meta = ms.get(new Path(p1));
       assertNotNull("Path should not have been deleted", meta);
     }
-    ms.delete(new Path(p1));
+    ms.delete(new Path(p1), null);
   }
 
   @Test
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
index 0a6e17f..8bbec1f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
@@ -215,7 +215,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
       throws IOException {
     describe("Recursive deletion");
     NanoTimer deleteTimer = new NanoTimer();
-    ms.deleteSubtree(BUCKET_ROOT);
+    ms.deleteSubtree(BUCKET_ROOT, null);
     deleteTimer.end();
     printTiming(LOG, "delete", deleteTimer, count);
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
index d4b6dd9..bc3224a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
@@ -18,21 +18,18 @@
 
 package org.apache.hadoop.fs.s3a.scale;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes;
+import org.apache.hadoop.util.DurationInfo;
 
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.createFiles;
 
 /**
  * Test some scalable operations related to file renaming and deletion.
@@ -41,6 +38,8 @@ public class ITestS3ADeleteManyFiles extends S3AScaleTestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(ITestS3ADeleteManyFiles.class);
 
+  public static final String PREFIX = ITestPartialRenamesDeletes.PREFIX;
+
   /**
    * CAUTION: If this test starts failing, please make sure that the
    * {@link org.apache.hadoop.fs.s3a.Constants#MAX_THREADS} configuration is not
@@ -55,65 +54,56 @@ public class ITestS3ADeleteManyFiles extends S3AScaleTestBase {
     final Path scaleTestDir = path("testBulkRenameAndDelete");
     final Path srcDir = new Path(scaleTestDir, "src");
     final Path finalDir = new Path(scaleTestDir, "final");
-    final long count = getOperationCount();
+    final int count = getConf().getInt(KEY_FILE_COUNT,
+        DEFAULT_FILE_COUNT);
     final S3AFileSystem fs = getFileSystem();
     ContractTestUtils.rm(fs, scaleTestDir, true, false);
     fs.mkdirs(srcDir);
-    fs.mkdirs(finalDir);
 
-    int testBufferSize = fs.getConf()
-        .getInt(ContractTestUtils.IO_CHUNK_BUFFER_SIZE,
-            ContractTestUtils.DEFAULT_IO_CHUNK_BUFFER_SIZE);
-    // use Executor to speed up file creation
-    ExecutorService exec = Executors.newFixedThreadPool(16);
-    final ExecutorCompletionService<Boolean> completionService =
-        new ExecutorCompletionService<>(exec);
-    try {
-      final byte[] data = ContractTestUtils.dataset(testBufferSize, 'a', 'z');
+    createFiles(fs, srcDir, 1, count, 0);
 
-      for (int i = 0; i < count; ++i) {
-        final String fileName = "foo-" + i;
-        completionService.submit(new Callable<Boolean>() {
-          @Override
-          public Boolean call() throws IOException {
-            ContractTestUtils.createFile(fs, new Path(srcDir, fileName),
-                false, data);
-            return fs.exists(new Path(srcDir, fileName));
-          }
-        });
-      }
-      for (int i = 0; i < count; ++i) {
-        final Future<Boolean> future = completionService.take();
-        try {
-          if (!future.get()) {
-            LOG.warn("cannot create file");
-          }
-        } catch (ExecutionException e) {
-          LOG.warn("Error while uploading file", e.getCause());
-          throw e;
-        }
-      }
-    } finally {
-      exec.shutdown();
+    FileStatus[] statuses = fs.listStatus(srcDir);
+    int nSrcFiles = statuses.length;
+    long sourceSize = 0;
+    for (FileStatus status : statuses) {
+      sourceSize += status.getLen();
     }
-
-    int nSrcFiles = fs.listStatus(srcDir).length;
-    fs.rename(srcDir, finalDir);
+    assertEquals("Source file Count", count, nSrcFiles);
+    ContractTestUtils.NanoTimer renameTimer = new ContractTestUtils.NanoTimer();
+    try (DurationInfo ignored = new DurationInfo(LOG,
+        "Rename %s to %s", srcDir, finalDir)) {
+      assertTrue("Rename failed", fs.rename(srcDir, finalDir));
+    }
+    renameTimer.end();
+    LOG.info("Effective rename bandwidth {} MB/s",
+        renameTimer.bandwidthDescription(sourceSize));
+    LOG.info(String.format(
+        "Time to rename a file: %,03f milliseconds",
+        (renameTimer.nanosPerOperation(count) * 1.0f) / 1.0e6));
     assertEquals(nSrcFiles, fs.listStatus(finalDir).length);
     ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
-        new Path(srcDir, "foo-" + 0));
+        new Path(srcDir, PREFIX + 0));
     ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
-        new Path(srcDir, "foo-" + count / 2));
+        new Path(srcDir, PREFIX + count / 2));
     ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
-        new Path(srcDir, "foo-" + (count - 1)));
+        new Path(srcDir, PREFIX + (count - 1)));
     ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
-        new Path(finalDir, "foo-" + 0));
+        new Path(finalDir, PREFIX + 0));
     ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
-        new Path(finalDir, "foo-" + count/2));
+        new Path(finalDir, PREFIX + count/2));
     ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
-        new Path(finalDir, "foo-" + (count-1)));
+        new Path(finalDir, PREFIX + (count-1)));
 
-    ContractTestUtils.assertDeleted(fs, finalDir, true, false);
+    ContractTestUtils.NanoTimer deleteTimer =
+        new ContractTestUtils.NanoTimer();
+    try (DurationInfo ignored = new DurationInfo(LOG,
+        "Delete subtree %s", finalDir)) {
+      assertDeleted(finalDir, true);
+    }
+    deleteTimer.end();
+    LOG.info(String.format(
+        "Time to delete an object %,03f milliseconds",
+        (deleteTimer.nanosPerOperation(count) * 1.0f) / 1.0e6));
   }
 
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
index b2a1aa0..eb80bc5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
@@ -133,6 +133,10 @@ public class S3AScaleTestBase extends AbstractS3ATestBase {
     return testPath;
   }
 
+  /**
+   * Get the configured operation count.
+   * @return the number of times to perform the operation being tested
+   */
   protected long getOperationCount() {
     return getConf().getLong(KEY_OPERATION_COUNT, DEFAULT_OPERATION_COUNT);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org