You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2017/05/25 12:16:47 UTC

[1/2] hadoop git commit: HADOOP-13760. S3Guard: add delete tracking [Forced Update!]

Repository: hadoop
Updated Branches:
  refs/heads/HADOOP-13345 c17759607 -> 8e257a406 (forced update)


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
index 99acf6e..dfa8a9e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.s3guard;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.Tristate;
@@ -134,12 +135,52 @@ public abstract class MetadataStoreTestBase extends Assert {
   }
 
   /**
+   * Helper function for verifying DescendantsIterator and
+   * MetadataStoreListFilesIterator behavior.
+   * @param createNodes List of paths to create
+   * @param checkNodes List of paths that the iterator should return
+   * @throws IOException
+   */
+  private void doTestDescendantsIterator(
+      Class implementation, String[] createNodes,
+      String[] checkNodes) throws Exception {
+    // we set up the example file system tree in metadata store
+    for (String pathStr : createNodes) {
+      final FileStatus status = pathStr.contains("file")
+          ? basicFileStatus(strToPath(pathStr), 100, false)
+          : basicFileStatus(strToPath(pathStr), 0, true);
+      ms.put(new PathMetadata(status));
+    }
+
+    final PathMetadata rootMeta = new PathMetadata(makeDirStatus("/"));
+    RemoteIterator<FileStatus> iterator;
+    if (implementation == DescendantsIterator.class) {
+      iterator = new DescendantsIterator(ms, rootMeta);
+    } else if (implementation == MetadataStoreListFilesIterator.class) {
+      iterator = new MetadataStoreListFilesIterator(ms, rootMeta, false);
+    } else {
+      throw new UnsupportedOperationException("Unrecognized class");
+    }
+
+    final Set<String> actual = new HashSet<>();
+    while (iterator.hasNext()) {
+      final Path p = iterator.next().getPath();
+      actual.add(Path.getPathWithoutSchemeAndAuthority(p).toString());
+    }
+    LOG.info("We got {} by iterating DescendantsIterator", actual);
+
+    if (!allowMissing()) {
+      assertEquals(Sets.newHashSet(checkNodes), actual);
+    }
+  }
+
+  /**
    * Test that we can get the whole sub-tree by iterating DescendantsIterator.
    *
    * The tree is similar to or same as the example in code comment.
    */
   @Test
-  public void testDescendantsIterator() throws IOException {
+  public void testDescendantsIterator() throws Exception {
     final String[] tree = new String[] {
         "/dir1",
         "/dir1/dir2",
@@ -152,26 +193,38 @@ public abstract class MetadataStoreTestBase extends Assert {
         "/dir1/dir3/dir5/file4",
         "/dir1/dir3/dir6"
     };
-    // we set up the example file system tree in metadata store
-    for (String pathStr : tree) {
-      final FileStatus status = pathStr.contains("file")
-          ? basicFileStatus(strToPath(pathStr), 100, false)
-          : basicFileStatus(strToPath(pathStr), 0, true);
-      ms.put(new PathMetadata(status));
-    }
-
-    final Set<String> actual = new HashSet<>();
-    final PathMetadata rootMeta = new PathMetadata(makeDirStatus("/"));
-    for (DescendantsIterator desc = new DescendantsIterator(ms, rootMeta);
-         desc.hasNext();) {
-      final Path p = desc.next().getPath();
-      actual.add(Path.getPathWithoutSchemeAndAuthority(p).toString());
-    }
-    LOG.info("We got {} by iterating DescendantsIterator", actual);
+    doTestDescendantsIterator(DescendantsIterator.class,
+        tree, tree);
+  }
 
-    if (!allowMissing()) {
-      assertEquals(Sets.newHashSet(tree), actual);
-    }
+  /**
+   * Test that we can get the correct subset of the tree with
+   * MetadataStoreListFilesIterator.
+   *
+   * The tree is similar to or same as the example in code comment.
+   */
+  @Test
+  public void testMetadataStoreListFilesIterator() throws Exception {
+    final String[] wholeTree = new String[] {
+        "/dir1",
+        "/dir1/dir2",
+        "/dir1/dir3",
+        "/dir1/dir2/file1",
+        "/dir1/dir2/file2",
+        "/dir1/dir3/dir4",
+        "/dir1/dir3/dir5",
+        "/dir1/dir3/dir4/file3",
+        "/dir1/dir3/dir5/file4",
+        "/dir1/dir3/dir6"
+    };
+    final String[] leafNodes = new String[] {
+        "/dir1/dir2/file1",
+        "/dir1/dir2/file2",
+        "/dir1/dir3/dir4/file3",
+        "/dir1/dir3/dir5/file4"
+    };
+    doTestDescendantsIterator(MetadataStoreListFilesIterator.class, wholeTree,
+        leafNodes);
   }
 
   @Test
@@ -258,7 +311,7 @@ public abstract class MetadataStoreTestBase extends Assert {
     /* Ensure delete happened. */
     assertDirectorySize("/ADirectory1/db1", 1);
     PathMetadata meta = ms.get(strToPath("/ADirectory1/db1/file2"));
-    assertNull("File deleted", meta);
+    assertTrue("File deleted", meta == null || meta.isDeleted());
   }
 
   @Test
@@ -284,10 +337,10 @@ public abstract class MetadataStoreTestBase extends Assert {
     ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"));
 
     assertEmptyDirectory(p + "/ADirectory1");
-    assertNotCached(p + "/ADirectory1/db1");
-    assertNotCached(p + "/ADirectory1/file1");
-    assertNotCached(p + "/ADirectory1/file2");
-    assertNotCached(p + "/ADirectory1/db1/dc1/dd1/deepFile");
+    assertDeleted(p + "/ADirectory1/db1");
+    assertDeleted(p + "/ADirectory1/file1");
+    assertDeleted(p + "/ADirectory1/file2");
+    assertDeleted(p + "/ADirectory1/db1/dc1/dd1/deepFile");
     assertEmptyDirectory(p + "/ADirectory2");
   }
 
@@ -302,11 +355,11 @@ public abstract class MetadataStoreTestBase extends Assert {
     setUpDeleteTest();
 
     ms.deleteSubtree(strToPath("/"));
-    assertNotCached("/ADirectory1");
-    assertNotCached("/ADirectory2");
-    assertNotCached("/ADirectory2/db1");
-    assertNotCached("/ADirectory2/db1/file1");
-    assertNotCached("/ADirectory2/db1/file2");
+    assertDeleted("/ADirectory1");
+    assertDeleted("/ADirectory2");
+    assertDeleted("/ADirectory2/db1");
+    assertDeleted("/ADirectory2/db1/file1");
+    assertDeleted("/ADirectory2/db1/file2");
   }
 
   @Test
@@ -350,6 +403,12 @@ public abstract class MetadataStoreTestBase extends Assert {
       verifyFileStatus(meta.getFileStatus(), 100);
     }
 
+    if (!(ms instanceof NullMetadataStore)) {
+      ms.delete(strToPath(filePath));
+      meta = ms.get(strToPath(filePath));
+      assertTrue("Tombstone not left for deleted file", meta.isDeleted());
+    }
+
     meta = ms.get(strToPath(dirPath));
     if (!allowMissing() || meta != null) {
       assertNotNull("Get found file (dir)", meta);
@@ -441,6 +500,7 @@ public abstract class MetadataStoreTestBase extends Assert {
 
     dirMeta = ms.listChildren(strToPath("/a1"));
     if (!allowMissing() || dirMeta != null) {
+      dirMeta = dirMeta.withoutTombstones();
       assertListingsEqual(dirMeta.getListing(), "/a1/b1", "/a1/b2");
     }
 
@@ -486,6 +546,7 @@ public abstract class MetadataStoreTestBase extends Assert {
     Collection<PathMetadata> entries;
     DirListingMetadata dirMeta = ms.listChildren(strToPath("/"));
     if (!allowMissing() || dirMeta != null) {
+      dirMeta = dirMeta.withoutTombstones();
       assertNotNull("Listing root", dirMeta);
       entries = dirMeta.getListing();
       assertListingsEqual(entries, "/a1", "/a2", "/a3");
@@ -513,13 +574,12 @@ public abstract class MetadataStoreTestBase extends Assert {
     dirMeta = ms.listChildren(strToPath("/a1"));
     if (!allowMissing() || dirMeta != null) {
       assertNotNull("Listing /a1", dirMeta);
-      entries = dirMeta.getListing();
+      entries = dirMeta.withoutTombstones().getListing();
       assertListingsEqual(entries, "/a1/b2");
     }
 
     PathMetadata meta = ms.get(strToPath("/a1/b1/file1"));
-    // TODO allow return of PathMetadata with isDeleted == true
-    assertNull("Src path deleted", meta);
+    assertTrue("Src path deleted", meta == null || meta.isDeleted());
 
     // Assert dest looks right
     meta = ms.get(strToPath("/b1/file1"));
@@ -596,7 +656,7 @@ public abstract class MetadataStoreTestBase extends Assert {
     ms.prune(cutoff);
     ls = ms.listChildren(strToPath("/pruneFiles"));
     if (allowMissing()) {
-      assertNotCached("/pruneFiles/old");
+      assertDeleted("/pruneFiles/old");
     } else {
       assertListingsEqual(ls.getListing(), "/pruneFiles/new");
     }
@@ -625,7 +685,7 @@ public abstract class MetadataStoreTestBase extends Assert {
 
     ms.prune(cutoff);
 
-    assertNotCached("/pruneDirs/dir/file");
+    assertDeleted("/pruneDirs/dir/file");
   }
 
   /*
@@ -646,6 +706,7 @@ public abstract class MetadataStoreTestBase extends Assert {
         "file3"));
     DirListingMetadata dirMeta = ms.listChildren(strToPath(parent));
     if (!allowMissing() || dirMeta != null) {
+      dirMeta = dirMeta.withoutTombstones();
       assertNotNull("list after putListStatus", dirMeta);
       Collection<PathMetadata> entries = dirMeta.getListing();
       assertNotNull("listStatus has entries", entries);
@@ -700,6 +761,7 @@ public abstract class MetadataStoreTestBase extends Assert {
       assertNotNull("Directory " + pathStr + " in cache", dirMeta);
     }
     if (!allowMissing() || dirMeta != null) {
+      dirMeta = dirMeta.withoutTombstones();
       assertEquals("Number of entries in dir " + pathStr, size,
           nonDeleted(dirMeta.getListing()).size());
     }
@@ -708,21 +770,27 @@ public abstract class MetadataStoreTestBase extends Assert {
   /** @return only file statuses which are *not* marked deleted. */
   private Collection<PathMetadata> nonDeleted(
       Collection<PathMetadata> statuses) {
-    /* TODO: filter out paths marked for deletion. */
-    return statuses;
+    Collection<PathMetadata> currentStatuses = new ArrayList<>();
+    for (PathMetadata status : statuses) {
+      if (!status.isDeleted()) {
+        currentStatuses.add(status);
+      }
+    }
+    return currentStatuses;
   }
 
-  private void assertNotCached(String pathStr) throws IOException {
-    // TODO this should return an entry with deleted flag set
+  private void assertDeleted(String pathStr) throws IOException {
     Path path = strToPath(pathStr);
     PathMetadata meta = ms.get(path);
-    assertNull(pathStr + " should not be cached.", meta);
+    boolean cached = meta != null && !meta.isDeleted();
+    assertFalse(pathStr + " should not be cached.", cached);
   }
 
   protected void assertCached(String pathStr) throws IOException {
     Path path = strToPath(pathStr);
     PathMetadata meta = ms.get(path);
-    assertNotNull(pathStr + " should be cached.", meta);
+    boolean cached = meta != null && !meta.isDeleted();
+    assertTrue(pathStr + " should be cached.", cached);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
index 3584b54..27416bb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
@@ -294,7 +294,7 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
     if (oldMetas != null) {
       // put all metadata of old paths and verify
       ms.put(new DirListingMetadata(oldDir, oldMetas, false));
-      assertEquals(0, ms.listChildren(newDir).numEntries());
+      assertEquals(0, ms.listChildren(newDir).withoutTombstones().numEntries());
       assertTrue(CollectionUtils.isEqualCollection(oldMetas,
           ms.listChildren(oldDir).getListing()));
 
@@ -306,7 +306,7 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
 
     // move the old paths to new paths and verify
     ms.move(pathsToDelete, newMetas);
-    assertEquals(0, ms.listChildren(oldDir).numEntries());
+    assertEquals(0, ms.listChildren(oldDir).withoutTombstones().numEntries());
     if (newMetas != null) {
       assertTrue(CollectionUtils.isEqualCollection(newMetas,
           ms.listChildren(newDir).getListing()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
index 4cffc6f..89d0498 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
@@ -75,7 +75,7 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
 
   @Test
   public void testClearByAncestor() {
-    Map<Path, String> map = new HashMap<>();
+    Map<Path, PathMetadata> map = new HashMap<>();
 
     // 1. Test paths without scheme/host
     assertClearResult(map, "", "/", 0);
@@ -90,21 +90,37 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
     assertClearResult(map, p, "/invalid", 5);
   }
 
-  private static void populateMap(Map<Path, String> map, String prefix) {
-    String dummyVal = "dummy";
-    map.put(new Path(prefix + "/dirA/dirB/"), dummyVal);
-    map.put(new Path(prefix + "/dirA/dirB/dirC"), dummyVal);
-    map.put(new Path(prefix + "/dirA/dirB/dirC/file1"), dummyVal);
-    map.put(new Path(prefix + "/dirA/dirB/dirC/file2"), dummyVal);
-    map.put(new Path(prefix + "/dirA/file1"), dummyVal);
+  private static void populateMap(Map<Path, PathMetadata> map,
+      String prefix) {
+    populateEntry(map, new Path(prefix + "/dirA/dirB/"));
+    populateEntry(map, new Path(prefix + "/dirA/dirB/dirC"));
+    populateEntry(map, new Path(prefix + "/dirA/dirB/dirC/file1"));
+    populateEntry(map, new Path(prefix + "/dirA/dirB/dirC/file2"));
+    populateEntry(map, new Path(prefix + "/dirA/file1"));
   }
 
-  private static void assertClearResult(Map <Path, String> map,
+  private static void populateEntry(Map<Path, PathMetadata> map,
+      Path path) {
+    map.put(path, new PathMetadata(new FileStatus(0, true, 0, 0, 0, path)));
+  }
+
+  private static int sizeOfMap(Map<Path, PathMetadata> map) {
+    int count = 0;
+    for (PathMetadata meta : map.values()) {
+      if (!meta.isDeleted()) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  private static void assertClearResult(Map <Path, PathMetadata> map,
       String prefixStr, String pathStr, int leftoverSize) {
     populateMap(map, prefixStr);
-    LocalMetadataStore.clearHashByAncestor(new Path(prefixStr + pathStr), map);
+    LocalMetadataStore.deleteHashByAncestor(new Path(prefixStr + pathStr), map,
+        true);
     assertEquals(String.format("Map should have %d entries", leftoverSize),
-        leftoverSize, map.size());
+        leftoverSize, sizeOfMap(map));
     map.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
index 9b8e3c1..876cc80 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
@@ -114,7 +114,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
         describe("Running move workload");
         NanoTimer moveTimer = new NanoTimer();
         LOG.info("Running {} moves of {} paths each", operations,
-            origPaths.size());
+            origMetas.size());
         for (int i = 0; i < operations; i++) {
           Collection<Path> toDelete;
           Collection<PathMetadata> toCreate;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: HADOOP-13760. S3Guard: add delete tracking

Posted by ma...@apache.org.
HADOOP-13760. S3Guard: add delete tracking


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e257a40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e257a40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e257a40

Branch: refs/heads/HADOOP-13345
Commit: 8e257a406201dcb3dec345884d1bfd613c0d9953
Parents: 80613da
Author: Sean Mackrory <ma...@apache.org>
Authored: Thu May 25 06:12:15 2017 -0600
Committer: Sean Mackrory <ma...@apache.org>
Committed: Thu May 25 06:16:20 2017 -0600

----------------------------------------------------------------------
 .../fs/s3a/InconsistentAmazonS3Client.java      | 125 +++++++-
 .../java/org/apache/hadoop/fs/s3a/Listing.java  |  91 ++++++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 310 +++++++++++++------
 .../fs/s3a/s3guard/DescendantsIterator.java     |  14 +-
 .../fs/s3a/s3guard/DirListingMetadata.java      |  32 ++
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java   |  86 +++--
 .../fs/s3a/s3guard/LocalMetadataStore.java      |  61 ++--
 .../hadoop/fs/s3a/s3guard/MetadataStore.java    |  19 +-
 .../fs/s3a/s3guard/NullMetadataStore.java       |   5 +
 .../hadoop/fs/s3a/s3guard/PathMetadata.java     |  27 +-
 .../PathMetadataDynamoDBTranslation.java        |   7 +-
 .../apache/hadoop/fs/s3a/s3guard/S3Guard.java   |  26 +-
 .../hadoop/fs/s3a/s3guard/S3GuardTool.java      |   5 +-
 .../hadoop/fs/s3a/ITestS3GuardEmptyDirs.java    |  62 ++--
 .../fs/s3a/ITestS3GuardListConsistency.java     | 213 ++++++++++++-
 .../fs/s3a/s3guard/MetadataStoreTestBase.java   | 150 ++++++---
 .../s3a/s3guard/TestDynamoDBMetadataStore.java  |   4 +-
 .../fs/s3a/s3guard/TestLocalMetadataStore.java  |  38 ++-
 .../AbstractITestS3AMetadataStoreScale.java     |   2 +-
 19 files changed, 1001 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
index ebca268..98ea16a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -23,6 +23,7 @@ import com.amazonaws.AmazonServiceException;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.PutObjectRequest;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -57,14 +59,50 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
   private static final Logger LOG =
       LoggerFactory.getLogger(InconsistentAmazonS3Client.class);
 
+  /**
+   * Composite of data we need to track about recently deleted objects:
+   * when it was deleted (same was with recently put objects) and the object
+   * summary (since we should keep returning it for sometime after its
+   * deletion).
+   */
+  private static class Delete {
+    private Long time;
+    private S3ObjectSummary summary;
+
+    Delete(Long time, S3ObjectSummary summary) {
+      this.time = time;
+      this.summary = summary;
+    }
+
+    public Long time() {
+      return time;
+    }
+
+    public S3ObjectSummary summary() {
+      return summary;
+    }
+  }
+
+  /** Map of key to delay -> time it was deleted + object summary (object
+   * summary is null for prefixes. */
+  private Map<String, Delete> delayedDeletes = new HashMap<>();
+
   /** Map of key to delay -> time it was created. */
-  private Map<String, Long> delayedKeys = new HashMap<>();
+  private Map<String, Long> delayedPutKeys = new HashMap<>();
 
   public InconsistentAmazonS3Client(AWSCredentialsProvider credentials,
       ClientConfiguration clientConfiguration) {
     super(credentials, clientConfiguration);
   }
 
+  @Override
+  public void deleteObject(DeleteObjectRequest deleteObjectRequest)
+      throws AmazonClientException, AmazonServiceException {
+    LOG.debug("key {}", deleteObjectRequest.getKey());
+    registerDeleteObject(deleteObjectRequest);
+    super.deleteObject(deleteObjectRequest);
+  }
+
   /* We should only need to override this version of putObject() */
   @Override
   public PutObjectResult putObject(PutObjectRequest putObjectRequest)
@@ -80,10 +118,49 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
       throws AmazonClientException, AmazonServiceException {
     LOG.debug("prefix {}", listObjectsRequest.getPrefix());
     ObjectListing listing = super.listObjects(listObjectsRequest);
-    return filterListObjects(listObjectsRequest,
-        listing);
+    listing = filterListObjects(listObjectsRequest, listing);
+    listing = restoreListObjects(listObjectsRequest, listing);
+    return listing;
   }
 
+  private boolean addIfNotPresent(List<S3ObjectSummary> list,
+      S3ObjectSummary item) {
+    // Behavior of S3ObjectSummary
+    String key = item.getKey();
+    for (S3ObjectSummary member : list) {
+      if (member.getKey().equals(key)) {
+        return false;
+      }
+    }
+    return list.add(item);
+  }
+
+  private ObjectListing restoreListObjects(ListObjectsRequest request,
+                                           ObjectListing rawListing) {
+    List<S3ObjectSummary> outputList = rawListing.getObjectSummaries();
+    List<String> outputPrefixes = rawListing.getCommonPrefixes();
+    for (String key : new HashSet<>(delayedDeletes.keySet())) {
+      Delete delete = delayedDeletes.get(key);
+      if (isKeyDelayed(delete.time(), key)) {
+        // TODO this works fine for flat directories but:
+        // if you have a delayed key /a/b/c/d and you are listing /a/b,
+        // this incorrectly will add /a/b/c/d to the listing for b
+        if (key.startsWith(request.getPrefix())) {
+          if (delete.summary == null) {
+            if (!outputPrefixes.contains(key)) {
+              outputPrefixes.add(key);
+            }
+          } else {
+            addIfNotPresent(outputList, delete.summary());
+          }
+        }
+      } else {
+        delayedDeletes.remove(key);
+      }
+    }
+
+    return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+  }
 
   private ObjectListing filterListObjects(ListObjectsRequest request,
       ObjectListing rawListing) {
@@ -91,7 +168,8 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     // Filter object listing
     List<S3ObjectSummary> outputList = new ArrayList<>();
     for (S3ObjectSummary s : rawListing.getObjectSummaries()) {
-      if (!isVisibilityDelayed(s.getKey())) {
+      String key = s.getKey();
+      if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
         outputList.add(s);
       }
     }
@@ -99,7 +177,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     // Filter prefixes (directories)
     List<String> outputPrefixes = new ArrayList<>();
     for (String key : rawListing.getCommonPrefixes()) {
-      if (!isVisibilityDelayed(key)) {
+      if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
         outputPrefixes.add(key);
       }
     }
@@ -107,28 +185,43 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     return new CustomObjectListing(rawListing, outputList, outputPrefixes);
   }
 
-  private boolean isVisibilityDelayed(String key) {
-    Long createTime = delayedKeys.get(key);
-    if (createTime == null) {
+  private boolean isKeyDelayed(Long enqueueTime, String key) {
+    if (enqueueTime == null) {
       LOG.debug("no delay for key {}", key);
       return false;
     }
     long currentTime = System.currentTimeMillis();
-    long deadline = createTime + DELAY_KEY_MILLIS;
+    long deadline = enqueueTime + DELAY_KEY_MILLIS;
     if (currentTime >= deadline) {
-      delayedKeys.remove(key);
-      LOG.debug("{} no longer delayed", key);
+      delayedDeletes.remove(key);
+      LOG.debug("no longer delaying {}", key);
       return false;
     } else  {
-      LOG.info("{} delaying visibility", key);
+      LOG.info("delaying {}", key);
       return true;
     }
   }
 
+  private void registerDeleteObject(DeleteObjectRequest req) {
+    String key = req.getKey();
+    if (shouldDelay(key)) {
+      // Record summary so we can add it back for some time post-deletion
+      S3ObjectSummary summary = null;
+      ObjectListing list = listObjects(req.getBucketName(), key);
+      for (S3ObjectSummary result : list.getObjectSummaries()) {
+        if (result.getKey().equals(key)) {
+          summary = result;
+          break;
+        }
+      }
+      delayedDeletes.put(key, new Delete(System.currentTimeMillis(), summary));
+    }
+  }
+
   private void registerPutObject(PutObjectRequest req) {
     String key = req.getKey();
     if (shouldDelay(key)) {
-      enqueueDelayKey(key);
+      enqueueDelayedPut(key);
     }
   }
 
@@ -148,9 +241,9 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
    * listObject replies for a while, to simulate eventual list consistency.
    * @param key key to delay visibility of
    */
-  private void enqueueDelayKey(String key) {
-    LOG.debug("key {}", key);
-    delayedKeys.put(key, System.currentTimeMillis());
+  private void enqueueDelayedPut(String key) {
+    LOG.debug("delaying put of {}", key);
+    delayedPutKeys.put(key, System.currentTimeMillis());
   }
 
   /** Since ObjectListing is immutable, we just override it with wrapper. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index e91f2ec..b45fc43 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -22,6 +22,7 @@ import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -33,6 +34,7 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -125,12 +127,27 @@ public class Listing {
    * @param statusIterator an iterator over the remote status entries
    * @return a new remote iterator
    */
+  @VisibleForTesting
   LocatedFileStatusIterator createLocatedFileStatusIterator(
       RemoteIterator<FileStatus> statusIterator) {
     return new LocatedFileStatusIterator(statusIterator);
   }
 
   /**
+   * Create an located status iterator that wraps another to filter out a set
+   * of recently deleted items.
+   * @param iterator an iterator over the remote located status entries.
+   * @param tombstones set of paths that are recently deleted and should be
+   *                   filtered.
+   * @return a new remote iterator.
+   */
+  @VisibleForTesting
+  TombstoneReconcilingIterator createTombstoneReconcilingIterator(
+      RemoteIterator<LocatedFileStatus> iterator, Set<Path> tombstones) {
+    return new TombstoneReconcilingIterator(iterator, tombstones);
+  }
+
+  /**
    * Interface to implement by the logic deciding whether to accept a summary
    * entry or path as a valid file or directory.
    */
@@ -668,6 +685,80 @@ public class Listing {
   }
 
   /**
+   * Wraps another iterator and filters out files that appear in the provided
+   * set of tombstones.  Will read ahead in the iterator when necessary to
+   * ensure that emptiness is detected early enough if only deleted objects
+   * remain in the source iterator.
+   */
+  static class TombstoneReconcilingIterator implements
+      RemoteIterator<LocatedFileStatus> {
+    private LocatedFileStatus next = null;
+    private final RemoteIterator<LocatedFileStatus> iterator;
+    private final Set<Path> tombstones;
+
+    /**
+     * @param iterator Source iterator to filter
+     * @param tombstones set of tombstone markers to filter out of results
+     */
+    TombstoneReconcilingIterator(RemoteIterator<LocatedFileStatus>
+        iterator, Set<Path> tombstones) {
+      this.iterator = iterator;
+      if (tombstones != null) {
+        this.tombstones = tombstones;
+      } else {
+        this.tombstones = Collections.EMPTY_SET;
+      }
+    }
+
+    private boolean fetch() throws IOException {
+      while (next == null && iterator.hasNext()) {
+        LocatedFileStatus candidate = iterator.next();
+        if (!tombstones.contains(candidate.getPath())) {
+          next = candidate;
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public boolean hasNext() throws IOException {
+      if (next != null) {
+        return true;
+      }
+      return fetch();
+    }
+
+    public LocatedFileStatus next() throws IOException {
+      if (hasNext()) {
+        LocatedFileStatus result = next;
+        next = null;
+        fetch();
+        return result;
+      }
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * Accept all entries except those which map to S3N pseudo directory markers.
+   */
+  static class AcceptAllButS3nDirs implements FileStatusAcceptor {
+
+    public boolean accept(Path keyPath, S3ObjectSummary summary) {
+      return !summary.getKey().endsWith(S3N_FOLDER_SUFFIX);
+    }
+
+    public boolean accept(Path keyPath, String prefix) {
+      return !keyPath.toString().endsWith(S3N_FOLDER_SUFFIX);
+    }
+
+    public boolean accept(FileStatus status) {
+      return !status.getPath().toString().endsWith(S3N_FOLDER_SUFFIX);
+    }
+
+  }
+
+  /**
    * Accept all entries except the base path and those which map to S3N
    * pseudo directory markers.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 78b3970..7de94cc 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -26,11 +26,13 @@ import java.io.InterruptedIOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -93,8 +95,8 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.s3a.s3guard.DescendantsIterator;
 import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
@@ -878,51 +880,44 @@ public class S3AFileSystem extends FileSystem {
         keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
       }
 
-      ListObjectsRequest request = new ListObjectsRequest();
-      request.setBucketName(bucket);
-      request.setPrefix(srcKey);
-      request.setMaxKeys(maxKeys);
-
-      ObjectListing objects = listObjects(request);
-
-      while (true) {
-        for (S3ObjectSummary summary : objects.getObjectSummaries()) {
-          long length = summary.getSize();
-          keysToDelete
-              .add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
-          String newDstKey =
-              dstKey + summary.getKey().substring(srcKey.length());
-          copyFile(summary.getKey(), newDstKey, length);
-
-          if (hasMetadataStore()) {
-            Path childSrc = keyToQualifiedPath(summary.getKey());
-            Path childDst = keyToQualifiedPath(newDstKey);
-            if (objectRepresentsDirectory(summary.getKey(), length)) {
-              S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc,
-                  childDst, username);
-            } else {
-              S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc,
-                  childDst, length, getDefaultBlockSize(childDst), username);
-            }
-            // Ancestor directories may not be listed, so we explicitly add them
-            S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas,
-                keyToQualifiedPath(srcKey), childSrc, childDst, username);
-          }
-
-          if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
-            removeKeys(keysToDelete, true, false);
+      Path parentPath = keyToPath(srcKey);
+      RemoteIterator<LocatedFileStatus> iterator = listFilesAndEmptyDirectories(
+          parentPath, true);
+      while (iterator.hasNext()) {
+        LocatedFileStatus status = iterator.next();
+        long length = status.getLen();
+        String key = pathToKey(status.getPath());
+        if (status.isDirectory() && !key.endsWith("/")) {
+          key += "/";
+        }
+        keysToDelete
+            .add(new DeleteObjectsRequest.KeyVersion(key));
+        String newDstKey =
+            dstKey + key.substring(srcKey.length());
+        copyFile(key, newDstKey, length);
+
+        if (hasMetadataStore()) {
+          Path childSrc = keyToQualifiedPath(key);
+          Path childDst = keyToQualifiedPath(newDstKey);
+          if (objectRepresentsDirectory(key, length)) {
+            S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc,
+                childDst, username);
+          } else {
+            S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc,
+                childDst, length, getDefaultBlockSize(childDst), username);
           }
+          // Ancestor directories may not be listed, so we explicitly add them
+          S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas,
+              keyToQualifiedPath(srcKey), childSrc, childDst, username);
         }
 
-        if (objects.isTruncated()) {
-          objects = continueListObjects(objects);
-        } else {
-          if (!keysToDelete.isEmpty()) {
-            removeKeys(keysToDelete, false, false);
-          }
-          break;
+        if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
+          removeKeys(keysToDelete, true, false);
         }
       }
+      if (!keysToDelete.isEmpty()) {
+        removeKeys(keysToDelete, false, false);
+      }
 
       // We moved all the children, now move the top-level dir
       // Empty directory should have been added as the object summary
@@ -1632,6 +1627,42 @@ public class S3AFileSystem extends FileSystem {
       throw translateException("innerMkdirs", path, e);
     }
   }
+
+  private enum DirectoryStatus {
+    DOES_NOT_EXIST, EXISTS_AND_IS_DIRECTORY_ON_S3_ONLY,
+    EXISTS_AND_IS_DIRECTORY_ON_METADATASTORE, EXISTS_AND_IS_FILE
+  }
+
+  private DirectoryStatus checkPathForDirectory(Path path) throws
+      IOException {
+    try {
+      if (path.isRoot()) {
+        return DirectoryStatus.EXISTS_AND_IS_DIRECTORY_ON_METADATASTORE;
+      }
+      String key = pathToKey(path);
+
+      // Check MetadataStore, if any.
+      FileStatus status = null;
+      PathMetadata pm = metadataStore.get(path, false);
+      if (pm != null) {
+        if (pm.isDeleted()) {
+          return DirectoryStatus.DOES_NOT_EXIST;
+        }
+        status = pm.getFileStatus();
+        if (status != null && status.isDirectory()) {
+          return DirectoryStatus.EXISTS_AND_IS_DIRECTORY_ON_METADATASTORE;
+        }
+      }
+      status = s3GetFileStatus(path, key, null);
+      if (status.isDirectory()) {
+        return DirectoryStatus.EXISTS_AND_IS_DIRECTORY_ON_S3_ONLY;
+      }
+    } catch (FileNotFoundException e) {
+      return DirectoryStatus.DOES_NOT_EXIST;
+    }
+    return DirectoryStatus.EXISTS_AND_IS_FILE;
+  }
+
   /**
    *
    * Make the given path and all non-existent parents into
@@ -1639,64 +1670,70 @@ public class S3AFileSystem extends FileSystem {
    * See {@link #mkdirs(Path, FsPermission)}
    * @param p path to create
    * @param permission to apply to f
-   * @return true if a directory was created
+   * @return true if a directory was created or already existed
    * @throws FileAlreadyExistsException there is a file at the path specified
    * @throws IOException other IO problems
    * @throws AmazonClientException on failures inside the AWS SDK
    */
   private boolean innerMkdirs(Path p, FsPermission permission)
       throws IOException, FileAlreadyExistsException, AmazonClientException {
+    boolean createOnS3 = false;
     Path f = qualify(p);
     LOG.debug("Making directory: {}", f);
     incrementStatistic(INVOCATION_MKDIRS);
-    FileStatus fileStatus;
     List<Path> metadataStoreDirs = null;
     if (hasMetadataStore()) {
       metadataStoreDirs = new ArrayList<>();
     }
 
-    try {
-      fileStatus = getFileStatus(f);
-
-      if (fileStatus.isDirectory()) {
-        return true;
-      } else {
-        throw new FileAlreadyExistsException("Path is a file: " + f);
+    DirectoryStatus status = checkPathForDirectory(f);
+    if (status == DirectoryStatus.DOES_NOT_EXIST) {
+      createOnS3 = true;
+      if (metadataStoreDirs != null) {
+        metadataStoreDirs.add(f);
       }
-    } catch (FileNotFoundException e) {
-      // Walk path to root, ensuring closest ancestor is a directory, not file
-      Path fPart = f.getParent();
+    } else if (status == DirectoryStatus.EXISTS_AND_IS_DIRECTORY_ON_S3_ONLY) {
       if (metadataStoreDirs != null) {
         metadataStoreDirs.add(f);
       }
-      do {
-        try {
-          fileStatus = getFileStatus(fPart);
-          if (fileStatus.isDirectory()) {
-            break;
-          }
-          if (fileStatus.isFile()) {
-            throw new FileAlreadyExistsException(String.format(
-                "Can't make directory for path '%s' since it is a file.",
-                fPart));
-          }
-        } catch (FileNotFoundException fnfe) {
-          instrumentation.errorIgnored();
-          // We create all missing directories in MetadataStore; it does not
-          // infer directories exist by prefix like S3.
-          if (metadataStoreDirs != null) {
-            metadataStoreDirs.add(fPart);
-          }
+    } else if (status == DirectoryStatus
+        .EXISTS_AND_IS_DIRECTORY_ON_METADATASTORE) {
+      return true;
+    } else if (status == DirectoryStatus.EXISTS_AND_IS_FILE) {
+      throw new FileAlreadyExistsException("Path is a file: " + f);
+    }
+
+    // Walk path to root, ensuring closest ancestor is a directory, not file
+    Path fPart = f.getParent();
+    do {
+      status = checkPathForDirectory(fPart);
+      // The fake directory on S3 may not be visible immediately, but
+      // only the leaf node has a fake directory on S3 so we can treat both
+      // cases the same and just create a metadata store entry.
+      if (status == DirectoryStatus.DOES_NOT_EXIST || status ==
+          DirectoryStatus.EXISTS_AND_IS_DIRECTORY_ON_S3_ONLY) {
+        if (metadataStoreDirs != null) {
+          metadataStoreDirs.add(fPart);
         }
-        fPart = fPart.getParent();
-      } while (fPart != null);
+      } else if (status == DirectoryStatus
+          .EXISTS_AND_IS_DIRECTORY_ON_METADATASTORE) {
+        // do nothing - just break out of the loop, make whatever child
+        // directories are needed on the metadata store, and return the
+        // result.
+        break;
+      } else if (status == DirectoryStatus.EXISTS_AND_IS_FILE) {
+        throw new FileAlreadyExistsException("Path is a file: " + f);
+      }
+      fPart = fPart.getParent();
+    } while (fPart != null);
 
+    if (createOnS3) {
       String key = pathToKey(f);
       createFakeDirectory(key);
-      S3Guard.makeDirsOrdered(metadataStore, metadataStoreDirs, username);
-      deleteUnnecessaryFakeDirectories(f.getParent());
-      return true;
     }
+    S3Guard.makeDirsOrdered(metadataStore, metadataStoreDirs, username);
+    deleteUnnecessaryFakeDirectories(f.getParent());
+    return true;
   }
 
   /**
@@ -1729,8 +1766,12 @@ public class S3AFileSystem extends FileSystem {
 
     // Check MetadataStore, if any.
     PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag);
+    Set<Path> tombstones = Collections.EMPTY_SET;
     if (pm != null) {
-      // HADOOP-13760: handle deleted files, i.e. PathMetadata#isDeleted() here
+      if (pm.isDeleted()) {
+        throw new FileNotFoundException("Path " + f + " is recorded as " +
+            "deleted by S3Guard");
+      }
 
       FileStatus msStatus = pm.getFileStatus();
       if (needEmptyDirectoryFlag && msStatus.isDirectory()) {
@@ -1738,15 +1779,29 @@ public class S3AFileSystem extends FileSystem {
           // We have a definitive true / false from MetadataStore, we are done.
           return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
         } else {
+          DirListingMetadata children = metadataStore.listChildren(path);
+          if (children != null) {
+            tombstones = children.listTombstones();
+          }
           LOG.debug("MetadataStore doesn't know if dir is empty, using S3.");
         }
       } else {
         // Either this is not a directory, or we don't care if it is empty
         return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
       }
+
+      // If the metadata store has no children for it and it's not listed in
+      // S3 yet, we'll assume the empty directory is true;
+      S3AFileStatus s3FileStatus;
+      try {
+        s3FileStatus = s3GetFileStatus(path, key, tombstones);
+      } catch (FileNotFoundException e) {
+        return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE);
+      }
+      return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
     }
-    return S3Guard.putAndReturn(metadataStore, s3GetFileStatus(path, key),
-        instrumentation);
+    return S3Guard.putAndReturn(metadataStore,
+        s3GetFileStatus(path, key, tombstones), instrumentation);
   }
 
   /**
@@ -1757,8 +1812,8 @@ public class S3AFileSystem extends FileSystem {
    * @return Status
    * @throws IOException
    */
-  private S3AFileStatus s3GetFileStatus(final Path path, String key)
-      throws IOException {
+  private S3AFileStatus s3GetFileStatus(final Path path, String key,
+      Set<Path> tombstones) throws IOException {
     if (!key.isEmpty()) {
       try {
         ObjectMetadata meta = getObjectMetadata(key);
@@ -1821,17 +1876,18 @@ public class S3AFileSystem extends FileSystem {
 
       ObjectListing objects = listObjects(request);
 
-      if (!objects.getCommonPrefixes().isEmpty()
-          || !objects.getObjectSummaries().isEmpty()) {
+      Collection<String> prefixes = objects.getCommonPrefixes();
+      Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
+      if (!isEmptyOfKeys(prefixes, tombstones) ||
+          !isEmptyOfObjects(summaries, tombstones)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Found path as directory (with /): {}/{}",
-              objects.getCommonPrefixes().size() ,
-              objects.getObjectSummaries().size());
+              prefixes.size(), summaries.size());
 
-          for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+          for (S3ObjectSummary summary : summaries) {
             LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
           }
-          for (String prefix : objects.getCommonPrefixes()) {
+          for (String prefix : prefixes) {
             LOG.debug("Prefix: {}", prefix);
           }
         }
@@ -1853,6 +1909,48 @@ public class S3AFileSystem extends FileSystem {
     throw new FileNotFoundException("No such file or directory: " + path);
   }
 
+  /** Helper function to determine if a collection of paths is empty
+   * after accounting for tombstone markers (if provided).
+   * @param keys Collection of path (prefixes / directories or keys).
+   * @param tombstones Set of tombstone markers, or null if not applicable.
+   * @return false if summaries contains objects not accounted for by
+   * tombstones.
+   */
+  private boolean isEmptyOfKeys(Collection<String> keys, Set<Path>
+      tombstones) {
+    if (tombstones == null) {
+      return keys.isEmpty();
+    }
+    for (String key : keys) {
+      Path qualified = keyToQualifiedPath(key);
+      if (!tombstones.contains(qualified)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Helper function to determine if a collection of object summaries is empty
+   * after accounting for tombstone markers (if provided).
+   * @param summaries Collection of objects as returned by listObjects.
+   * @param tombstones Set of tombstone markers, or null if not applicable.
+   * @return false if summaries contains objects not accounted for by
+   * tombstones.
+   */
+  private boolean isEmptyOfObjects(Collection<S3ObjectSummary> summaries,
+      Set<Path> tombstones) {
+    if (tombstones == null) {
+      return summaries.isEmpty();
+    }
+    Collection<String> stringCollection = new ArrayList<>(summaries.size());
+    for(S3ObjectSummary summary : summaries) {
+      stringCollection.add(summary.getKey());
+    }
+    return isEmptyOfKeys(stringCollection, tombstones);
+  }
+
+
+
   /**
    * Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
    * S3Guard MetadataStore, if any, will be skipped.
@@ -1862,7 +1960,7 @@ public class S3AFileSystem extends FileSystem {
     Path path = qualify(f);
     String key = pathToKey(path);
     try {
-      return s3GetFileStatus(path, key) != null;
+      return s3GetFileStatus(path, key, null) != null;
     } catch (FileNotFoundException e) {
       return false;
     }
@@ -2420,10 +2518,10 @@ public class S3AFileSystem extends FileSystem {
         new Listing.AcceptFilesOnly(qualify(f)));
   }
 
-  public RemoteIterator<LocatedFileStatus> listFilesAndDirectories(Path f,
+  public RemoteIterator<LocatedFileStatus> listFilesAndEmptyDirectories(Path f,
       boolean recursive) throws IOException {
     return innerListFiles(f, recursive,
-        new Listing.AcceptAllButSelfAndS3nDirs(qualify(f)));
+        new Listing.AcceptAllButS3nDirs());
   }
 
   private RemoteIterator<LocatedFileStatus> innerListFiles(Path f, boolean
@@ -2446,23 +2544,37 @@ public class S3AFileSystem extends FileSystem {
         LOG.debug("Requesting all entries under {} with delimiter '{}'",
             key, delimiter);
         final RemoteIterator<FileStatus> cachedFilesIterator;
+        final Set<Path> tombstones;
         if (recursive) {
           final PathMetadata pm = metadataStore.get(path, true);
-          cachedFilesIterator = new DescendantsIterator(metadataStore, pm);
+          // shouldn't need to check pm.isDeleted() because that will have
+          // been caught by getFileStatus above.
+          MetadataStoreListFilesIterator metadataStoreListFilesIterator =
+              new MetadataStoreListFilesIterator(metadataStore, pm,
+                  allowAuthoritative);
+          tombstones = metadataStoreListFilesIterator.listTombstones();
+          cachedFilesIterator = metadataStoreListFilesIterator;
         } else {
-          final DirListingMetadata meta = metadataStore.listChildren(path);
+          DirListingMetadata meta = metadataStore.listChildren(path);
+          if (meta != null) {
+            tombstones = meta.listTombstones();
+          } else {
+            tombstones = null;
+          }
           cachedFilesIterator = listing.createProvidedFileStatusIterator(
               S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
           if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
             return listing.createLocatedFileStatusIterator(cachedFilesIterator);
           }
         }
-        return listing.createLocatedFileStatusIterator(
-            listing.createFileStatusListingIterator(path,
-                createListObjectsRequest(key, delimiter),
-                ACCEPT_ALL,
-                acceptor,
-                cachedFilesIterator));
+        return listing.createTombstoneReconcilingIterator(
+            listing.createLocatedFileStatusIterator(
+                listing.createFileStatusListingIterator(path,
+                    createListObjectsRequest(key, delimiter),
+                    ACCEPT_ALL,
+                    acceptor,
+                    cachedFilesIterator)),
+            tombstones);
       }
     } catch (AmazonClientException e) {
       // TODO s3guard:
@@ -2514,7 +2626,7 @@ public class S3AFileSystem extends FileSystem {
         final String key = maybeAddTrailingSlash(pathToKey(path));
         final Listing.FileStatusAcceptor acceptor =
             new Listing.AcceptAllButSelfAndS3nDirs(path);
-        final DirListingMetadata meta = metadataStore.listChildren(path);
+        DirListingMetadata meta = metadataStore.listChildren(path);
         final RemoteIterator<FileStatus> cachedFileStatusIterator =
             listing.createProvidedFileStatusIterator(
                 S3Guard.dirMetaToStatuses(meta), filter, acceptor);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
index d008972..262a6fa 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a.s3guard;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.NoSuchElementException;
 import java.util.Queue;
@@ -103,8 +104,9 @@ public class DescendantsIterator implements RemoteIterator<FileStatus> {
     if (meta != null) {
       final Path path = meta.getFileStatus().getPath();
       if (path.isRoot()) {
-        final DirListingMetadata rootListing = ms.listChildren(path);
+        DirListingMetadata rootListing = ms.listChildren(path);
         if (rootListing != null) {
+          rootListing = rootListing.withoutTombstones();
           queue.addAll(rootListing.getListing());
         }
       } else {
@@ -123,11 +125,17 @@ public class DescendantsIterator implements RemoteIterator<FileStatus> {
     if (!hasNext()) {
       throw new NoSuchElementException("No more descendants.");
     }
-    final PathMetadata next;
+    PathMetadata next;
     next = queue.poll();
     if (next.getFileStatus().isDirectory()) {
       final Path path = next.getFileStatus().getPath();
-      queue.addAll(metadataStore.listChildren(path).getListing());
+      DirListingMetadata meta = metadataStore.listChildren(path);
+      if (meta != null) {
+        Collection<PathMetadata> more = meta.withoutTombstones().getListing();
+        if (!more.isEmpty()) {
+          queue.addAll(more);
+        }
+      }
     }
     return next.getFileStatus();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
index f13b447..320fa8d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
@@ -25,9 +25,12 @@ import org.apache.hadoop.fs.Path;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Preconditions;
@@ -104,6 +107,26 @@ public class DirListingMetadata {
     return Collections.unmodifiableCollection(listMap.values());
   }
 
+  public Set<Path> listTombstones() {
+    Set<Path> tombstones = new HashSet<>();
+    for (PathMetadata meta : listMap.values()) {
+      if (meta.isDeleted()) {
+        tombstones.add(meta.getFileStatus().getPath());
+      }
+    }
+    return tombstones;
+  }
+
+  public DirListingMetadata withoutTombstones() {
+    Collection<PathMetadata> filteredList = new ArrayList<>();
+    for (PathMetadata meta : listMap.values()) {
+      if (!meta.isDeleted()) {
+        filteredList.add(meta);
+      }
+    }
+    return new DirListingMetadata(path, filteredList, isAuthoritative);
+  }
+
   /**
    * @return number of entries tracked.  This is not the same as the number
    * of entries in the actual directory unless {@link #isAuthoritative()} is
@@ -166,6 +189,15 @@ public class DirListingMetadata {
   }
 
   /**
+   * Replace an entry with a tombstone.
+   * @param childPath path of entry to replace.
+   */
+  public void markDeleted(Path childPath) {
+    checkChildPath(childPath);
+    listMap.put(childPath, PathMetadata.tombstone(childPath));
+  }
+
+  /**
    * Remove entry from this directory.
    *
    * @param childPath path of entry to remove.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 302541c..784b815 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -185,6 +185,9 @@ public class DynamoDBMetadataStore implements MetadataStore {
    * DynamoDB. Value is {@value} msec. */
   public static final long MIN_RETRY_SLEEP_MSEC = 100;
 
+  private static ValueMap deleteTrackingValueMap =
+      new ValueMap().withBoolean(":false", false);
+
   private DynamoDB dynamoDB;
   private String region;
   private Table table;
@@ -286,6 +289,16 @@ public class DynamoDBMetadataStore implements MetadataStore {
 
   @Override
   public void delete(Path path) throws IOException {
+    innerDelete(path, true);
+  }
+
+  @Override
+  public void forgetMetadata(Path path) throws IOException {
+    innerDelete(path, false);
+  }
+
+  private void innerDelete(Path path, boolean tombstone)
+      throws IOException {
     path = checkPath(path);
     LOG.debug("Deleting from table {} in region {}: {}",
         tableName, region, path);
@@ -297,7 +310,13 @@ public class DynamoDBMetadataStore implements MetadataStore {
     }
 
     try {
-      table.deleteItem(pathToKey(path));
+      if (tombstone) {
+        Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
+            PathMetadata.tombstone(path));
+        table.putItem(item);
+      } else {
+        table.deleteItem(pathToKey(path));
+      }
     } catch (AmazonClientException e) {
       throw translateException("delete", path, e);
     }
@@ -310,17 +329,24 @@ public class DynamoDBMetadataStore implements MetadataStore {
         tableName, region, path);
 
     final PathMetadata meta = get(path);
-    if (meta == null) {
+    if (meta == null || meta.isDeleted()) {
       LOG.debug("Subtree path {} does not exist; this will be a no-op", path);
       return;
     }
 
     for (DescendantsIterator desc = new DescendantsIterator(this, meta);
          desc.hasNext();) {
-      delete(desc.next().getPath());
+      innerDelete(desc.next().getPath(), true);
     }
   }
 
+  private Item getConsistentItem(PrimaryKey key) {
+    final GetItemSpec spec = new GetItemSpec()
+        .withPrimaryKey(key)
+        .withConsistentRead(true); // strictly consistent read
+    return table.getItem(spec);
+  }
+
   @Override
   public PathMetadata get(Path path) throws IOException {
     return get(path, false);
@@ -338,10 +364,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
         // Root does not persist in the table
         meta = new PathMetadata(makeDirStatus(username, path));
       } else {
-        final GetItemSpec spec = new GetItemSpec()
-            .withPrimaryKey(pathToKey(path))
-            .withConsistentRead(true); // strictly consistent read
-        final Item item = table.getItem(spec);
+        final Item item = getConsistentItem(pathToKey(path));
         meta = itemToPathMetadata(item, username);
         LOG.debug("Get from table {} in region {} returning for {}: {}",
             tableName, region, path, meta);
@@ -354,7 +377,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
           final QuerySpec spec = new QuerySpec()
               .withHashKey(pathToParentKeyAttribute(path))
               .withConsistentRead(true)
-              .withMaxResultSize(1); // limit 1
+              .withFilterExpression(IS_DELETED + " = :false")
+              .withValueMap(deleteTrackingValueMap);
           final ItemCollection<QueryOutcome> items = table.query(spec);
           boolean hasChildren = items.iterator().hasNext();
           // When this class has support for authoritative
@@ -398,7 +422,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
 
       final List<PathMetadata> metas = new ArrayList<>();
       for (Item item : items) {
-        metas.add(itemToPathMetadata(item, username));
+        PathMetadata meta = itemToPathMetadata(item, username);
+        metas.add(meta);
       }
       LOG.trace("Listing table {} in region {} for {} returning {}",
           tableName, region, path, metas);
@@ -430,9 +455,9 @@ public class DynamoDBMetadataStore implements MetadataStore {
     // Following code is to maintain this invariant by putting all ancestor
     // directories of the paths to create.
     // ancestor paths that are not explicitly added to paths to create
-    Collection<PathMetadata> inferredPathsToCreate = null;
+    Collection<PathMetadata> newItems = new ArrayList<>();
     if (pathsToCreate != null) {
-      inferredPathsToCreate = new ArrayList<>(pathsToCreate);
+      newItems.addAll(pathsToCreate);
       // help set for fast look up; we should avoid putting duplicate paths
       final Collection<Path> fullPathsToCreate = new HashSet<>();
       for (PathMetadata meta : pathsToCreate) {
@@ -448,16 +473,20 @@ public class DynamoDBMetadataStore implements MetadataStore {
           LOG.debug("move: auto-create ancestor path {} for child path {}",
               parent, meta.getFileStatus().getPath());
           final FileStatus status = makeDirStatus(parent, username);
-          inferredPathsToCreate.add(new PathMetadata(status, Tristate.FALSE));
+          newItems.add(new PathMetadata(status, Tristate.FALSE, false));
           fullPathsToCreate.add(parent);
           parent = parent.getParent();
         }
       }
     }
+    if (pathsToDelete != null) {
+      for (Path meta : pathsToDelete) {
+        newItems.add(PathMetadata.tombstone(meta));
+      }
+    }
 
     try {
-      processBatchWriteRequest(pathToKey(pathsToDelete),
-          pathMetadataToItem(inferredPathsToCreate));
+      processBatchWriteRequest(null, pathMetadataToItem(newItems));
     } catch (AmazonClientException e) {
       throw translateException("move", (String) null, e);
     }
@@ -565,13 +594,10 @@ public class DynamoDBMetadataStore implements MetadataStore {
     // first existent ancestor
     Path path = meta.getFileStatus().getPath().getParent();
     while (path != null && !path.isRoot()) {
-      final GetItemSpec spec = new GetItemSpec()
-          .withPrimaryKey(pathToKey(path))
-          .withConsistentRead(true); // strictly consistent read
-      final Item item = table.getItem(spec);
-      if (item == null) {
+      final Item item = getConsistentItem(pathToKey(path));
+      if (!itemExists(item)) {
         final FileStatus status = makeDirStatus(path, username);
-        metasToPut.add(new PathMetadata(status, Tristate.FALSE));
+        metasToPut.add(new PathMetadata(status, Tristate.FALSE, false));
         path = path.getParent();
       } else {
         break;
@@ -580,6 +606,17 @@ public class DynamoDBMetadataStore implements MetadataStore {
     return metasToPut;
   }
 
+  private boolean itemExists(Item item) {
+    if (item == null) {
+      return false;
+    }
+    if (item.hasAttribute(IS_DELETED) &&
+        item.getBoolean(IS_DELETED)) {
+      return false;
+    }
+    return true;
+  }
+
   /** Create a directory FileStatus using current system time as mod time. */
   static FileStatus makeDirStatus(Path f, String owner) {
     return  new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
@@ -591,9 +628,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
     LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
 
     // directory path
-    PathMetadata p = new PathMetadata(
-        makeDirStatus(meta.getPath(), username),
-        meta.isEmpty());
+    PathMetadata p = new PathMetadata(makeDirStatus(meta.getPath(), username),
+        meta.isEmpty(), false);
 
     // First add any missing ancestors...
     final Collection<PathMetadata> metasToPut = fullPathsToPut(p);
@@ -670,13 +706,13 @@ public class DynamoDBMetadataStore implements MetadataStore {
         itemCount++;
         if (deletionBatch.size() == S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT) {
           Thread.sleep(delay);
-          processBatchWriteRequest(pathToKey(deletionBatch), new Item[0]);
+          processBatchWriteRequest(pathToKey(deletionBatch), null);
           deletionBatch.clear();
         }
       }
       if (deletionBatch.size() > 0) {
         Thread.sleep(delay);
-        processBatchWriteRequest(pathToKey(deletionBatch), new Item[0]);
+        processBatchWriteRequest(pathToKey(deletionBatch), null);
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
index 52e5b2a..d7e256d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -101,29 +101,33 @@ public class LocalMetadataStore implements MetadataStore {
   }
 
   @Override
-  public void delete(Path path) throws IOException {
-    doDelete(path, false);
+  public void delete(Path p) throws IOException {
+    doDelete(p, false, true);
+  }
+
+  @Override
+  public void forgetMetadata(Path p) throws IOException {
+    doDelete(p, false, false);
   }
 
   @Override
   public void deleteSubtree(Path path) throws IOException {
-    doDelete(path, true);
+    doDelete(path, true, true);
   }
 
-  private synchronized void doDelete(Path p, boolean recursive) {
+  private synchronized void doDelete(Path p, boolean recursive, boolean
+      tombstone) {
 
     Path path = standardize(p);
-    // We could implement positive hit for 'deleted' files.  For now we
-    // do not track them.
 
     // Delete entry from file cache, then from cached parent directory, if any
 
-    removeHashEntries(path);
+    deleteHashEntries(path, tombstone);
 
     if (recursive) {
       // Remove all entries that have this dir as path prefix.
-      clearHashByAncestor(path, dirHash);
-      clearHashByAncestor(path, fileHash);
+      deleteHashByAncestor(path, dirHash, tombstone);
+      deleteHashByAncestor(path, fileHash, tombstone);
     }
   }
 
@@ -157,7 +161,7 @@ public class LocalMetadataStore implements MetadataStore {
    */
   private Tristate isEmptyDirectory(Path p) {
     DirListingMetadata dirMeta = dirHash.get(p);
-    return dirMeta.isEmpty();
+    return dirMeta.withoutTombstones().isEmpty();
   }
 
   @Override
@@ -186,9 +190,9 @@ public class LocalMetadataStore implements MetadataStore {
     synchronized (this) {
 
       // 1. Delete pathsToDelete
-      for (Path p : pathsToDelete) {
-        LOG.debug("move: deleting metadata {}", p);
-        delete(p);
+      for (Path meta : pathsToDelete) {
+        LOG.debug("move: deleting metadata {}", meta);
+        delete(meta);
       }
 
       // 2. Create new destination path metadata
@@ -323,13 +327,25 @@ public class LocalMetadataStore implements MetadataStore {
   }
 
   @VisibleForTesting
-  static <T> void clearHashByAncestor(Path ancestor, Map<Path, T> hash) {
+  static <T> void deleteHashByAncestor(Path ancestor, Map<Path, T> hash,
+                                       boolean tombstone) {
     for (Iterator<Map.Entry<Path, T>> it = hash.entrySet().iterator();
          it.hasNext();) {
       Map.Entry<Path, T> entry = it.next();
       Path f = entry.getKey();
+      T meta = entry.getValue();
       if (isAncestorOf(ancestor, f)) {
-        it.remove();
+        if (tombstone) {
+          if (meta instanceof PathMetadata) {
+            entry.setValue((T) PathMetadata.tombstone(f));
+          } else if (meta instanceof DirListingMetadata) {
+            it.remove();
+          } else {
+            throw new IllegalStateException("Unknown type in hash");
+          }
+        } else {
+          it.remove();
+        }
       }
     }
   }
@@ -351,16 +367,21 @@ public class LocalMetadataStore implements MetadataStore {
    * Update fileHash and dirHash to reflect deletion of file 'f'.  Call with
    * lock held.
    */
-  private void removeHashEntries(Path path) {
+  private void deleteHashEntries(Path path, boolean tombstone) {
 
     // Remove target file/dir
     LOG.debug("delete file entry for {}", path);
-    fileHash.remove(path);
+    if (tombstone) {
+      fileHash.put(path, PathMetadata.tombstone(path));
+    } else {
+      fileHash.remove(path);
+    }
 
     // Update this and parent dir listing, if any
 
     /* If this path is a dir, remove its listing */
     LOG.debug("removing listing of {}", path);
+
     dirHash.remove(path);
 
     /* Remove this path from parent's dir listing */
@@ -369,7 +390,11 @@ public class LocalMetadataStore implements MetadataStore {
       DirListingMetadata dir = dirHash.get(parent);
       if (dir != null) {
         LOG.debug("removing parent's entry for {} ", path);
-        dir.remove(path);
+        if (tombstone) {
+          dir.markDeleted(path);
+        } else {
+          dir.remove(path);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
index 5511532..9502a8a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -55,7 +56,8 @@ public interface MetadataStore extends Closeable {
   void initialize(Configuration conf) throws IOException;
 
   /**
-   * Deletes exactly one path.
+   * Deletes exactly one path, leaving a tombstone to prevent lingering,
+   * inconsistent copies of it from being listed.
    *
    * @param path the path to delete
    * @throws IOException if there is an error
@@ -63,7 +65,20 @@ public interface MetadataStore extends Closeable {
   void delete(Path path) throws IOException;
 
   /**
-   * Deletes the entire sub-tree rooted at the given path.
+   * Removes the record of exactly one path.  Does not leave a tombstone (see
+   * {@link MetadataStore#delete(Path)}. It is currently intended for testing
+   * only, and a need to use it as part of normal FileSystem usage is not
+   * anticipated.
+   *
+   * @param path the path to delete
+   * @throws IOException if there is an error
+   */
+  @VisibleForTesting
+  void forgetMetadata(Path path) throws IOException;
+
+  /**
+   * Deletes the entire sub-tree rooted at the given path, leaving tombstones
+   * to prevent lingering, inconsistent copies of it from being listed.
    *
    * In addition to affecting future calls to {@link #get(Path)},
    * implementations must also update any stored {@code DirListingMetadata}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
index 3869d13..65019eb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
@@ -52,6 +52,11 @@ public class NullMetadataStore implements MetadataStore {
   }
 
   @Override
+  public void forgetMetadata(Path path) throws IOException {
+    return;
+  }
+
+  @Override
   public void deleteSubtree(Path path) throws IOException {
     return;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
index b5d4f04..d799e16 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Tristate;
 
 /**
@@ -34,6 +35,13 @@ public class PathMetadata {
 
   private final FileStatus fileStatus;
   private Tristate isEmptyDirectory;
+  private boolean isDeleted;
+
+  public static PathMetadata tombstone(Path path) {
+    long now = System.currentTimeMillis();
+    FileStatus status = new FileStatus(0, false, 0, 0, now, path);
+    return new PathMetadata(status, Tristate.UNKNOWN, true);
+  }
 
   /**
    * Creates a new {@code PathMetadata} containing given {@code FileStatus}.
@@ -44,6 +52,11 @@ public class PathMetadata {
   }
 
   public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir) {
+    this(fileStatus, isEmptyDir, false);
+  }
+
+  public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir, boolean
+      isDeleted) {
     Preconditions.checkNotNull(fileStatus, "fileStatus must be non-null");
     Preconditions.checkNotNull(fileStatus.getPath(), "fileStatus path must be" +
         " non-null");
@@ -51,6 +64,7 @@ public class PathMetadata {
         " be absolute");
     this.fileStatus = fileStatus;
     this.isEmptyDirectory = isEmptyDir;
+    this.isDeleted = isDeleted;
   }
 
   /**
@@ -73,6 +87,14 @@ public class PathMetadata {
     this.isEmptyDirectory = isEmptyDirectory;
   }
 
+  public boolean isDeleted() {
+    return isDeleted;
+  }
+
+  void setIsDeleted(boolean isDeleted) {
+    this.isDeleted = isDeleted;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (!(o instanceof PathMetadata)) {
@@ -91,6 +113,7 @@ public class PathMetadata {
     return "PathMetadata{" +
         "fileStatus=" + fileStatus +
         "; isEmptyDirectory=" + isEmptyDirectory +
+        "; isDeleted=" + isDeleted +
         '}';
   }
 
@@ -99,10 +122,10 @@ public class PathMetadata {
    * @param sb target StringBuilder
    */
   public void prettyPrint(StringBuilder sb) {
-    sb.append(String.format("%-5s %-20s %-7d %s",
+    sb.append(String.format("%-5s %-20s %-7d %-8s %-6s",
         fileStatus.isDirectory() ? "dir" : "file",
         fileStatus.getPath().toString(), fileStatus.getLen(),
-        isEmptyDirectory.name()));
+        isEmptyDirectory.name(), isDeleted));
     sb.append(fileStatus);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
index 1c04016..c206a00 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.Tristate;
 
 /**
  * Defines methods for translating between domain model objects and their
@@ -62,6 +63,7 @@ final class PathMetadataDynamoDBTranslation {
   static final String FILE_LENGTH = "file_length";
   @VisibleForTesting
   static final String BLOCK_SIZE = "block_size";
+  static final String IS_DELETED = "is_deleted";
 
   /** Table version field {@value} in version marker item. */
   @VisibleForTesting
@@ -133,8 +135,10 @@ final class PathMetadataDynamoDBTranslation {
       fileStatus = new FileStatus(len, false, 1, block, modTime, 0, null,
           username, username, path);
     }
+    boolean isDeleted =
+        item.hasAttribute(IS_DELETED) ? item.getBoolean(IS_DELETED) : false;
 
-    return new PathMetadata(fileStatus);
+    return new PathMetadata(fileStatus, Tristate.UNKNOWN, isDeleted);
   }
 
   /**
@@ -154,6 +158,7 @@ final class PathMetadataDynamoDBTranslation {
           .withLong(MOD_TIME, status.getModificationTime())
           .withLong(BLOCK_SIZE, status.getBlockSize());
     }
+    item.withBoolean(IS_DELETED, meta.isDeleted());
     return item;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index 53dc991..19f0e50 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -39,6 +39,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
@@ -143,7 +144,8 @@ public final class S3Guard {
 
   /**
    * Convert the data of a directory listing to an array of {@link FileStatus}
-   * entries. If the listing is null an empty array is returned.
+   * entries. Tombstones are filtered out at this point. If the listing is null
+   * an empty array is returned.
    * @param dirMeta directory listing -may be null
    * @return a possibly-empty array of file status entries
    */
@@ -153,15 +155,15 @@ public final class S3Guard {
     }
 
     Collection<PathMetadata> listing = dirMeta.getListing();
-    FileStatus[] statuses = new FileStatus[listing.size()];
+    List<FileStatus> statuses = new ArrayList<>();
 
-    // HADOOP-13760: filter out deleted entries here
-    int i = 0;
     for (PathMetadata pm : listing) {
-      statuses[i++] = pm.getFileStatus();
+      if (!pm.isDeleted()) {
+        statuses.add(pm.getFileStatus());
+      }
     }
 
-    return statuses;
+    return statuses.toArray(new FileStatus[0]);
   }
 
   /**
@@ -196,6 +198,8 @@ public final class S3Guard {
           false);
     }
 
+    Set<Path> deleted = dirMeta.listTombstones();
+
     // Since we treat the MetadataStore as a "fresher" or "consistent" view
     // of metadata, we always use its metadata first.
 
@@ -203,10 +207,11 @@ public final class S3Guard {
     // we will basically start with the set of directory entries in the
     // DirListingMetadata, and add any that only exist in the backingStatuses.
 
-    // HADOOP-13760: filter out deleted files via PathMetadata#isDeleted() here
-
     boolean changed = false;
     for (FileStatus s : backingStatuses) {
+      if (deleted.contains(s.getPath())) {
+        continue;
+      }
 
       // Minor race condition here.  Multiple threads could add to this
       // mutable DirListingMetadata.  Since it is backed by a
@@ -245,7 +250,7 @@ public final class S3Guard {
    *              list will contain [/a/b/c/d, /a/b/c, /a/b].   /a/b/c/d is
    *              an empty, dir, and the other dirs only contain their child
    *              dir.
-   * @param owner Hadoop user name
+   * @param owner Hadoop user name.
    */
   public static void makeDirsOrdered(MetadataStore ms, List<Path> dirs,
       String owner) {
@@ -280,7 +285,8 @@ public final class S3Guard {
         children = new ArrayList<>(1);
         children.add(new PathMetadata(prevStatus));
       }
-      DirListingMetadata dirMeta = new DirListingMetadata(f, children, true);
+      DirListingMetadata dirMeta =
+          new DirListingMetadata(f, children, true);
       try {
         ms.put(dirMeta);
         ms.put(new PathMetadata(status));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 3d4f11d..9bd0cb8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -446,7 +446,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
     private long importDir(FileStatus status) throws IOException {
       Preconditions.checkArgument(status.isDirectory());
       RemoteIterator<LocatedFileStatus> it =
-          s3a.listFilesAndDirectories(status.getPath(), true);
+          s3a.listFilesAndEmptyDirectories(status.getPath(), true);
       long items = 0;
 
       while (it.hasNext()) {
@@ -686,7 +686,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
       } catch (FileNotFoundException e) {
       }
       PathMetadata meta = ms.get(qualified);
-      FileStatus msStatus = meta != null ? meta.getFileStatus() : null;
+      FileStatus msStatus = (meta != null && !meta.isDeleted()) ?
+          meta.getFileStatus() : null;
       compareDir(msStatus, s3Status, out);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
index 55e310b..fb6e370 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
@@ -39,45 +39,47 @@ public class ITestS3GuardEmptyDirs extends AbstractS3ATestBase {
 
   @Test
   public void testEmptyDirs() throws Exception {
-
     S3AFileSystem fs = getFileSystem();
     Assume.assumeTrue(fs.hasMetadataStore());
     MetadataStore configuredMs = fs.getMetadataStore();
-
-    // 1. Simulate files already existing in the bucket before we started our
-    // cluster.  Temporarily disable the MetadataStore so it doesn't witness
-    // us creating these files.
-
-    fs.setMetadataStore(new NullMetadataStore());
-
     Path existingDir = path("existing-dir");
-    assertTrue(fs.mkdirs(existingDir));
     Path existingFile = path("existing-dir/existing-file");
-    touch(fs, existingFile);
+    try {
+      // 1. Simulate files already existing in the bucket before we started our
+      // cluster.  Temporarily disable the MetadataStore so it doesn't witness
+      // us creating these files.
+
+      fs.setMetadataStore(new NullMetadataStore());
+      assertTrue(fs.mkdirs(existingDir));
+      touch(fs, existingFile);
 
 
-    // 2. Simulate (from MetadataStore's perspective) starting our cluster and
-    // creating a file in an existing directory.
-    fs.setMetadataStore(configuredMs);  // "start cluster"
-    Path newFile = path("existing-dir/new-file");
-    touch(fs, newFile);
+      // 2. Simulate (from MetadataStore's perspective) starting our cluster and
+      // creating a file in an existing directory.
+      fs.setMetadataStore(configuredMs);  // "start cluster"
+      Path newFile = path("existing-dir/new-file");
+      touch(fs, newFile);
 
-    S3AFileStatus status = fs.innerGetFileStatus(existingDir, true);
-    assertEquals("Should not be empty dir", Tristate.FALSE,
-        status.isEmptyDirectory());
+      S3AFileStatus status = fs.innerGetFileStatus(existingDir, true);
+      assertEquals("Should not be empty dir", Tristate.FALSE,
+          status.isEmptyDirectory());
 
-    // 3. Assert that removing the only file the MetadataStore witnessed
-    // being created doesn't cause it to think the directory is now empty.
-    fs.delete(newFile, false);
-    status = fs.innerGetFileStatus(existingDir, true);
-    assertEquals("Should not be empty dir", Tristate.FALSE,
-        status.isEmptyDirectory());
+      // 3. Assert that removing the only file the MetadataStore witnessed
+      // being created doesn't cause it to think the directory is now empty.
+      fs.delete(newFile, false);
+      status = fs.innerGetFileStatus(existingDir, true);
+      assertEquals("Should not be empty dir", Tristate.FALSE,
+          status.isEmptyDirectory());
 
-    // 4. Assert that removing the final file, that existed "before"
-    // MetadataStore started, *does* cause the directory to be marked empty.
-    fs.delete(existingFile, false);
-    status = fs.innerGetFileStatus(existingDir, true);
-    assertEquals("Should be empty dir now", Tristate.TRUE,
-        status.isEmptyDirectory());
+      // 4. Assert that removing the final file, that existed "before"
+      // MetadataStore started, *does* cause the directory to be marked empty.
+      fs.delete(existingFile, false);
+      status = fs.innerGetFileStatus(existingDir, true);
+      assertEquals("Should be empty dir now", Tristate.TRUE,
+          status.isEmptyDirectory());
+    } finally {
+      configuredMs.forgetMetadata(existingFile);
+      configuredMs.forgetMetadata(existingDir);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e257a40/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index 5e83906..8771fd2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -56,8 +56,185 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     return new S3AContract(conf);
   }
 
+  /**
+   * Helper function for other test cases: does a single rename operation and
+   * validates the aftermath.
+   * @param mkdirs Directories to create
+   * @param srcdirs Source paths for rename operation
+   * @param dstdirs Destination paths for rename operation
+   * @param yesdirs Files that must exist post-rename (e.g. srcdirs children)
+   * @param nodirs Files that must not exist post-rename (e.g. dstdirs children)
+   * @throws Exception
+   */
+  private void doTestRenameSequence(Path[] mkdirs, Path[] srcdirs,
+      Path[] dstdirs, Path[] yesdirs, Path[] nodirs) throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Assume.assumeTrue(fs.hasMetadataStore());
+
+    if (mkdirs != null) {
+      for (Path mkdir : mkdirs) {
+        assertTrue(fs.mkdirs(mkdir));
+      }
+      Thread.sleep(InconsistentAmazonS3Client.DELAY_KEY_MILLIS);
+    }
+
+    assertTrue("srcdirs and dstdirs must have equal length",
+        srcdirs.length == dstdirs.length);
+    for (int i = 0; i < srcdirs.length; i++) {
+      assertTrue("Rename returned false: " + srcdirs[i] + " -> " + dstdirs[i],
+          fs.rename(srcdirs[i], dstdirs[i]));
+    }
+
+    for (Path yesdir : yesdirs) {
+      assertTrue("Path was supposed to exist: " + yesdir, fs.exists(yesdir));
+    }
+    for (Path nodir : nodirs) {
+      assertFalse("Path is not supposed to exist: " + nodir, fs.exists(nodir));
+    }
+  }
+
+  /**
+   * Tests that after renaming a directory, the original directory and its
+   * contents are indeed missing and the corresponding new paths are visible.
+   * @throws Exception
+   */
+  @Test
+  public void testConsistentListAfterRename() throws Exception {
+    Path[] mkdirs = {
+      path("d1/f"),
+      path("d1/f" + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING)
+    };
+    Path[] srcdirs = {path("d1")};
+    Path[] dstdirs = {path("d2")};
+    Path[] yesdirs = {path("d2"), path("d2/f"),
+        path("d2/f" + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING)};
+    Path[] nodirs = {path("d1"), path("d1/f"),
+        path("d1/f" + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING)};
+    doTestRenameSequence(mkdirs, srcdirs, dstdirs, yesdirs, nodirs);
+    getFileSystem().delete(path("d1"), true);
+    getFileSystem().delete(path("d2"), true);
+  }
+
+  /**
+   * Tests a circular sequence of renames to verify that overwriting recently
+   * deleted files and reading recently created files from rename operations
+   * works as expected.
+   * @throws Exception
+   */
+  @Test
+  public void testRollingRenames() throws Exception {
+    Path[] dir0 = {path("rolling/1")};
+    Path[] dir1 = {path("rolling/2")};
+    Path[] dir2 = {path("rolling/3")};
+    // These sets have to be in reverse order compared to the movement
+    Path[] setA = {dir1[0], dir0[0]};
+    Path[] setB = {dir2[0], dir1[0]};
+    Path[] setC = {dir0[0], dir2[0]};
+
+    for(int i = 0; i < 2; i++) {
+      Path[] firstSet = i == 0 ? setA : null;
+      doTestRenameSequence(firstSet, setA, setB, setB, dir0);
+      doTestRenameSequence(null, setB, setC, setC, dir1);
+      doTestRenameSequence(null, setC, setA, setA, dir2);
+    }
+
+    S3AFileSystem fs = getFileSystem();
+    assertFalse("Renaming deleted file should have failed",
+        fs.rename(dir2[0], dir1[0]));
+    assertTrue("Renaming over existing file should have succeeded",
+        fs.rename(dir1[0], dir0[0]));
+  }
+
+  /**
+   * Tests that deleted files immediately stop manifesting in list operations
+   * even when the effect in S3 is delayed.
+   * @throws Exception
+   */
+  @Test
+  public void testConsistentListAfterDelete() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    // test will fail if NullMetadataStore (the default) is configured: skip it.
+    Assume.assumeTrue(fs.hasMetadataStore());
+
+    // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+    // in listObjects() results via InconsistentS3Client
+    Path inconsistentPath =
+        path("a/b/dir3-" + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING);
+
+    Path[] testDirs = {path("a/b/dir1"),
+        path("a/b/dir2"),
+        inconsistentPath};
+
+    for (Path path : testDirs) {
+      assertTrue(fs.mkdirs(path));
+    }
+    Thread.sleep(2 * InconsistentAmazonS3Client.DELAY_KEY_MILLIS);
+    for (Path path : testDirs) {
+      assertTrue(fs.delete(path, false));
+    }
+
+    FileStatus[] paths = fs.listStatus(path("a/b/"));
+    List<Path> list = new ArrayList<>();
+    for (FileStatus fileState : paths) {
+      list.add(fileState.getPath());
+    }
+    assertFalse(list.contains(path("a/b/dir1")));
+    assertFalse(list.contains(path("a/b/dir2")));
+    // This should fail without S3Guard, and succeed with it.
+    assertFalse(list.contains(inconsistentPath));
+  }
+
+  /**
+   * Tests that rename immediately after files in the source directory are
+   * deleted results in exactly the correct set of destination files and none
+   * of the source files.
+   * @throws Exception
+   */
   @Test
-  public void testConsistentListStatus() throws Exception {
+  public void testConsistentRenameAfterDelete() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    // test will fail if NullMetadataStore (the default) is configured: skip it.
+    Assume.assumeTrue(fs.hasMetadataStore());
+
+    // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+    // in listObjects() results via InconsistentS3Client
+    Path inconsistentPath =
+        path("a/b/dir3-" + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING);
+
+    Path[] testDirs = {path("a/b/dir1"),
+        path("a/b/dir2"),
+        inconsistentPath};
+
+    for (Path path : testDirs) {
+      assertTrue(fs.mkdirs(path));
+    }
+    Thread.sleep(2 * InconsistentAmazonS3Client.DELAY_KEY_MILLIS);
+    assertTrue(fs.delete(testDirs[1], false));
+    assertTrue(fs.delete(testDirs[2], false));
+
+    fs.rename(path("a"), path("a3"));
+    FileStatus[] paths = fs.listStatus(path("a3/b"));
+    List<Path> list = new ArrayList<>();
+    for (FileStatus fileState : paths) {
+      list.add(fileState.getPath());
+    }
+    assertTrue(list.contains(path("a3/b/dir1")));
+    assertFalse(list.contains(path("a3/b/dir2")));
+    // This should fail without S3Guard, and succeed with it.
+    assertFalse(list.contains(
+        path("a3/b/dir3-" + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING)));
+
+    try {
+      RemoteIterator<LocatedFileStatus> old = fs.listFilesAndEmptyDirectories(
+          path("a"), true);
+      fail("Recently renamed dir should not be visible");
+    } catch(FileNotFoundException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testConsistentListStatusAfterPut() throws Exception {
 
     S3AFileSystem fs = getFileSystem();
 
@@ -90,23 +267,25 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
   }
 
   /**
-   * Similar to {@link #testConsistentListStatus()}, this tests that the FS
-   * listLocatedStatus() call will return consistent list.
+   * Similar to {@link #testConsistentListStatusAfterPut()}, this tests that the
+   * FS listLocatedStatus() call will return consistent list.
    */
   @Test
-  public void testConsistentListLocatedStatus() throws Exception {
+  public void testConsistentListLocatedStatusAfterPut() throws Exception {
     final S3AFileSystem fs = getFileSystem();
     // This test will fail if NullMetadataStore (the default) is configured:
     // skip it.
     Assume.assumeTrue(fs.hasMetadataStore());
-    fs.mkdirs(path("doTestConsistentListLocatedStatus"));
+    String rootDir = "doTestConsistentListLocatedStatusAfterPut";
+    fs.mkdirs(path(rootDir));
 
     final int[] numOfPaths = {0, 1, 10};
     for (int normalPathNum : numOfPaths) {
       for (int delayedPathNum : numOfPaths) {
         LOG.info("Testing with normalPathNum={}, delayedPathNum={}",
             normalPathNum, delayedPathNum);
-        doTestConsistentListLocatedStatus(fs, normalPathNum, delayedPathNum);
+        doTestConsistentListLocatedStatusAfterPut(fs, rootDir, normalPathNum,
+            delayedPathNum);
       }
     }
   }
@@ -118,18 +297,18 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
    * @param delayedPathNum number paths listed with delaying
    * @throws Exception
    */
-  private void doTestConsistentListLocatedStatus(S3AFileSystem fs,
-      int normalPathNum, int delayedPathNum) throws Exception {
+  private void doTestConsistentListLocatedStatusAfterPut(S3AFileSystem fs,
+      String rootDir, int normalPathNum, int delayedPathNum) throws Exception {
     final List<Path> testDirs = new ArrayList<>(normalPathNum + delayedPathNum);
     int index = 0;
     for (; index < normalPathNum; index++) {
-      testDirs.add(path("doTestConsistentListLocatedStatus/dir-" + index));
+      testDirs.add(path(rootDir + "/dir-" +
+          index));
     }
     for (; index < normalPathNum + delayedPathNum; index++) {
       // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
       // in listObjects() results via InconsistentS3Client
-      testDirs.add(path("doTestConsistentListLocatedStatus/dir-" + index
-          + DELAY_KEY_SUBSTRING));
+      testDirs.add(path(rootDir + "/dir-" + index + DELAY_KEY_SUBSTRING));
     }
 
     for (Path path : testDirs) {
@@ -141,7 +320,7 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
 
     // this should return the union data from S3 and MetadataStore
     final RemoteIterator<LocatedFileStatus> statusIterator =
-        fs.listLocatedStatus(path("doTestConsistentListLocatedStatus/"));
+        fs.listLocatedStatus(path(rootDir + "/"));
     List<Path> list = new ArrayList<>();
     for (; statusIterator.hasNext();) {
       list.add(statusIterator.next().getPath());
@@ -223,10 +402,13 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
       fileNames.add("file-" + index + "-" + DELAY_KEY_SUBSTRING);
     }
 
+    int filesAndEmptyDirectories = 0;
+
     // create files under each test directory
     for (Path dir : testDirs) {
       for (String fileName : fileNames) {
         writeTextFile(fs, new Path(dir, fileName), "I, " + fileName, false);
+        filesAndEmptyDirectories++;
       }
     }
 
@@ -251,7 +433,7 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
       verifyFileIsListed(listedFiles, baseTestDir, fileNames);
     } else {
       assertEquals("Unexpected number of files returned by listFiles() call",
-          testDirs.size() * (normalFileNum + delayedFileNum),
+          filesAndEmptyDirectories,
           listedFiles.size());
       for (Path dir : testDirs) {
         verifyFileIsListed(listedFiles, dir, fileNames);
@@ -363,6 +545,11 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     assertEquals("Unexpected number of results from metadata store. "
             + "Should have /OnS3 and /OnS3AndMS: " + mdResults,
         2, mdResults.numEntries());
+
+    // If we don't clean this up, the next test run will fail because it will
+    // have recorded /OnS3 being deleted even after it's written to noS3Guard.
+    getFileSystem().getMetadataStore().forgetMetadata(
+        new Path(directory, "OnS3"));
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org