You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/06/16 16:06:38 UTC

[hadoop] branch trunk updated: HADOOP-16279. S3Guard: Implement time-based (TTL) expiry for entries (and tombstones).

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f9cc9e1  HADOOP-16279. S3Guard: Implement time-based (TTL) expiry for entries (and tombstones).
f9cc9e1 is described below

commit f9cc9e162175444efe9d5b07ecb9a795f750ca3c
Author: Gabor Bota <ga...@cloudera.com>
AuthorDate: Sun Jun 16 17:05:01 2019 +0100

    HADOOP-16279. S3Guard: Implement time-based (TTL) expiry for entries (and tombstones).
    
    Contributed by Gabor Bota.
    
    Change-Id: I73a2d2861901dedfe7a0e783b310fbb95e7c1af9
---
 .../src/main/resources/core-default.xml            |   8 +-
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  12 +-
 .../java/org/apache/hadoop/fs/s3a/Listing.java     |   2 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  39 ++-
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java      |  99 ++++--
 .../hadoop/fs/s3a/s3guard/ITtlTimeProvider.java    |  34 ++
 .../hadoop/fs/s3a/s3guard/LocalMetadataStore.java  | 111 ++++---
 .../hadoop/fs/s3a/s3guard/MetadataStore.java       |  87 +++--
 .../hadoop/fs/s3a/s3guard/NullMetadataStore.java   |  13 +-
 .../org/apache/hadoop/fs/s3a/s3guard/S3Guard.java  | 137 ++++++--
 .../apache/hadoop/fs/s3a/s3guard/S3GuardTool.java  |   9 +-
 .../src/site/markdown/tools/hadoop-aws/s3guard.md  |   4 +-
 .../fs/s3a/ITestS3GuardOutOfBandOperations.java    | 349 ++++++++++++++++++++-
 .../org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java  | 194 +++++++++++-
 .../s3a/s3guard/AbstractS3GuardToolTestBase.java   |   2 +-
 .../fs/s3a/s3guard/ITestDynamoDBMetadataStore.java |  10 +-
 .../s3guard/ITestDynamoDBMetadataStoreScale.java   |   5 +-
 .../fs/s3a/s3guard/MetadataStoreTestBase.java      | 127 +++++++-
 .../fs/s3a/s3guard/TestLocalMetadataStore.java     |   7 +-
 .../fs/s3a/s3guard/TestNullMetadataStore.java      |   5 +
 .../apache/hadoop/fs/s3a/s3guard/TestS3Guard.java  | 195 +++++++++++-
 .../scale/AbstractITestS3AMetadataStoreScale.java  |  14 +-
 22 files changed, 1287 insertions(+), 176 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index b5056d1..7ffc2ad 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1502,12 +1502,10 @@
 </property>
 
 <property>
-    <name>fs.s3a.metadatastore.authoritative.dir.ttl</name>
-    <value>3600000</value>
+    <name>fs.s3a.metadatastore.metadata.ttl</name>
+    <value>15m</value>
     <description>
-        This value sets how long a directory listing in the MS is considered as
-        authoritative. The value is in milliseconds.
-        MetadataStore should be authoritative to use this configuration knob.
+        This value sets how long an entry in a MetadataStore is valid.
     </description>
 </property>
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index a8dc161..7334506 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -353,10 +353,14 @@ public final class Constants {
   /**
    * How long a directory listing in the MS is considered as authoritative.
    */
-  public static final String METADATASTORE_AUTHORITATIVE_DIR_TTL =
-      "fs.s3a.metadatastore.authoritative.dir.ttl";
-  public static final long DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL =
-      TimeUnit.MINUTES.toMillis(60);
+  public static final String METADATASTORE_METADATA_TTL =
+      "fs.s3a.metadatastore.metadata.ttl";
+
+  /**
+   * Default TTL in milliseconds: 15 minutes.
+   */
+  public static final long DEFAULT_METADATASTORE_METADATA_TTL =
+      TimeUnit.MINUTES.toMillis(15);
 
   /** read ahead buffer size to prevent connection re-establishments. */
   public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 0f8c52b..b62c456 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -464,7 +464,7 @@ public class Listing {
         if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
           S3AFileStatus status = createFileStatus(keyPath, summary,
               owner.getDefaultBlockSize(keyPath), owner.getUsername(),
-              null, null);
+              summary.getETag(), null);
           LOG.debug("Adding: {}", status);
           stats.add(status);
           added++;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index e6850e9..4bd58d5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -126,6 +126,7 @@ import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.fs.store.EtagChecksum;
@@ -244,7 +245,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
   private AWSCredentialProviderList credentials;
 
-  private S3Guard.ITtlTimeProvider ttlTimeProvider;
+  private ITtlTimeProvider ttlTimeProvider;
 
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
@@ -388,9 +389,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
             getMetadataStore(), allowAuthoritative);
       }
       initMultipartUploads(conf);
-      long authDirTtl = conf.getLong(METADATASTORE_AUTHORITATIVE_DIR_TTL,
-          DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
-      ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
+      if (hasMetadataStore()) {
+        long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
+            DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
+        ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
+      }
     } catch (AmazonClientException e) {
       throw translateException("initializing ", new Path(name), e);
     }
@@ -1341,7 +1344,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       }
     }
 
-    metadataStore.move(srcPaths, dstMetas);
+    metadataStore.move(srcPaths, dstMetas, ttlTimeProvider);
 
     if (!src.getParent().equals(dst.getParent())) {
       LOG.debug("source & dest parents are different; fix up dir markers");
@@ -1722,7 +1725,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       instrumentation.directoryDeleted();
     }
     deleteObject(key);
-    metadataStore.delete(f);
+    metadataStore.delete(f, ttlTimeProvider);
   }
 
   /**
@@ -2143,7 +2146,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           }
         }
       }
-      metadataStore.deleteSubtree(f);
+      metadataStore.deleteSubtree(f, ttlTimeProvider);
     } else {
       LOG.debug("delete: Path is a file");
       deleteObjectAtPath(f, key, true);
@@ -2466,7 +2469,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     LOG.debug("Getting path status for {}  ({})", path, key);
 
     // Check MetadataStore, if any.
-    PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag);
+    PathMetadata pm = null;
+    if (hasMetadataStore()) {
+      pm = S3Guard.getWithTtl(metadataStore, path, ttlTimeProvider);
+    }
     Set<Path> tombstones = Collections.emptySet();
     if (pm != null) {
       if (pm.isDeleted()) {
@@ -2501,7 +2507,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
             LOG.debug("S3Guard metadata for {} is outdated, updating it",
                 path);
             return S3Guard.putAndReturn(metadataStore, s3AFileStatus,
-                instrumentation);
+                instrumentation, ttlTimeProvider);
           }
         }
       }
@@ -2534,12 +2540,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
             null, null);
       }
       // entry was found, save in S3Guard
-      return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
+      return S3Guard.putAndReturn(metadataStore, s3FileStatus,
+          instrumentation, ttlTimeProvider);
     } else {
       // there was no entry in S3Guard
       // retrieve the data and update the metadata store in the process.
       return S3Guard.putAndReturn(metadataStore,
-          s3GetFileStatus(path, key, tombstones), instrumentation);
+          s3GetFileStatus(path, key, tombstones), instrumentation,
+          ttlTimeProvider);
     }
   }
 
@@ -3191,11 +3199,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     // See note about failure semantics in S3Guard documentation
     try {
       if (hasMetadataStore()) {
-        S3Guard.addAncestors(metadataStore, p, username);
+        S3Guard.addAncestors(metadataStore, p, username, ttlTimeProvider);
         S3AFileStatus status = createUploadFileStatus(p,
             S3AUtils.objectRepresentsDirectory(key, length), length,
             getDefaultBlockSize(p), username, eTag, versionId);
-        S3Guard.putAndReturn(metadataStore, status, instrumentation);
+        S3Guard.putAndReturn(metadataStore, status, instrumentation,
+            ttlTimeProvider);
       }
     } catch (IOException e) {
       if (failOnMetadataWriteError) {
@@ -3860,12 +3869,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   @VisibleForTesting
-  protected S3Guard.ITtlTimeProvider getTtlTimeProvider() {
+  public ITtlTimeProvider getTtlTimeProvider() {
     return ttlTimeProvider;
   }
 
   @VisibleForTesting
-  protected void setTtlTimeProvider(S3Guard.ITtlTimeProvider ttlTimeProvider) {
+  protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
     this.ttlTimeProvider = ttlTimeProvider;
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index fa1a203..f668c6a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -189,8 +189,10 @@ import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
  * directory helps prevent unnecessary queries during traversal of an entire
  * sub-tree.
  *
- * Some mutating operations, notably {@link #deleteSubtree(Path)} and
- * {@link #move(Collection, Collection)}, are less efficient with this schema.
+ * Some mutating operations, notably
+ * {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and
+ * {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider)},
+ * are less efficient with this schema.
  * They require mutating multiple items in the DynamoDB table.
  *
  * By default, DynamoDB access is performed within the same AWS region as
@@ -471,14 +473,15 @@ public class DynamoDBMetadataStore implements MetadataStore,
 
   @Override
   @Retries.RetryTranslated
-  public void delete(Path path) throws IOException {
-    innerDelete(path, true);
+  public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
+      throws IOException {
+    innerDelete(path, true, ttlTimeProvider);
   }
 
   @Override
   @Retries.RetryTranslated
   public void forgetMetadata(Path path) throws IOException {
-    innerDelete(path, false);
+    innerDelete(path, false, null);
   }
 
   /**
@@ -487,10 +490,13 @@ public class DynamoDBMetadataStore implements MetadataStore,
    * There is no check as to whether the entry exists in the table first.
    * @param path path to delete
    * @param tombstone flag to create a tombstone marker
+   * @param ttlTimeProvider The time provider to set last_updated. Must not
+   *                        be null if tombstone is true.
    * @throws IOException I/O error.
    */
   @Retries.RetryTranslated
-  private void innerDelete(final Path path, boolean tombstone)
+  private void innerDelete(final Path path, boolean tombstone,
+      ITtlTimeProvider ttlTimeProvider)
       throws IOException {
     checkPath(path);
     LOG.debug("Deleting from table {} in region {}: {}",
@@ -505,8 +511,13 @@ public class DynamoDBMetadataStore implements MetadataStore,
     // on that of S3A itself
     boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT;
     if (tombstone) {
+      Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider "
+          + "must not be null");
+      final PathMetadata pmTombstone = PathMetadata.tombstone(path);
+      // update the last updated field of record when putting a tombstone
+      pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
       Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
-          new DDBPathMetadata(PathMetadata.tombstone(path)));
+          new DDBPathMetadata(pmTombstone));
       writeOp.retry(
           "Put tombstone",
           path.toString(),
@@ -524,7 +535,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
 
   @Override
   @Retries.RetryTranslated
-  public void deleteSubtree(Path path) throws IOException {
+  public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+      throws IOException {
     checkPath(path);
     LOG.debug("Deleting subtree from table {} in region {}: {}",
         tableName, region, path);
@@ -537,7 +549,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
 
     for (DescendantsIterator desc = new DescendantsIterator(this, meta);
          desc.hasNext();) {
-      innerDelete(desc.next().getPath(), true);
+      innerDelete(desc.next().getPath(), true, ttlTimeProvider);
     }
   }
 
@@ -731,7 +743,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
   @Override
   @Retries.RetryTranslated
   public void move(Collection<Path> pathsToDelete,
-      Collection<PathMetadata> pathsToCreate) throws IOException {
+      Collection<PathMetadata> pathsToCreate, ITtlTimeProvider ttlTimeProvider)
+      throws IOException {
     if (pathsToDelete == null && pathsToCreate == null) {
       return;
     }
@@ -754,7 +767,11 @@ public class DynamoDBMetadataStore implements MetadataStore,
     }
     if (pathsToDelete != null) {
       for (Path meta : pathsToDelete) {
-        newItems.add(new DDBPathMetadata(PathMetadata.tombstone(meta)));
+        Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider"
+            + " must not be null");
+        final PathMetadata pmTombstone = PathMetadata.tombstone(meta);
+        pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
+        newItems.add(new DDBPathMetadata(pmTombstone));
       }
     }
 
@@ -1024,14 +1041,37 @@ public class DynamoDBMetadataStore implements MetadataStore,
   }
 
   @Retries.RetryTranslated
-  private ItemCollection<ScanOutcome> expiredFiles(long modTime,
-      String keyPrefix) throws IOException {
-    String filterExpression =
-        "mod_time < :mod_time and begins_with(parent, :parent)";
-    String projectionExpression = "parent,child";
-    ValueMap map = new ValueMap()
-        .withLong(":mod_time", modTime)
-        .withString(":parent", keyPrefix);
+  private ItemCollection<ScanOutcome> expiredFiles(PruneMode pruneMode,
+      long cutoff, String keyPrefix) throws IOException {
+
+    String filterExpression;
+    String projectionExpression;
+    ValueMap map;
+
+    switch (pruneMode) {
+    case ALL_BY_MODTIME:
+      filterExpression =
+          "mod_time < :mod_time and begins_with(parent, :parent)";
+      projectionExpression = "parent,child";
+      map = new ValueMap()
+          .withLong(":mod_time", cutoff)
+          .withString(":parent", keyPrefix);
+      break;
+    case TOMBSTONES_BY_LASTUPDATED:
+      filterExpression =
+          "last_updated < :last_updated and begins_with(parent, :parent) "
+              + "and is_deleted = :is_deleted";
+      projectionExpression = "parent,child";
+      map = new ValueMap()
+          .withLong(":last_updated", cutoff)
+          .withString(":parent", keyPrefix)
+          .withBoolean(":is_deleted", true);
+      break;
+    default:
+      throw new UnsupportedOperationException("Unsupported prune mode: "
+          + pruneMode);
+    }
+
     return readOp.retry(
         "scan",
         keyPrefix,
@@ -1041,20 +1081,31 @@ public class DynamoDBMetadataStore implements MetadataStore,
 
   @Override
   @Retries.RetryTranslated
-  public void prune(long modTime) throws IOException {
-    prune(modTime, "/");
+  public void prune(PruneMode pruneMode, long cutoff) throws IOException {
+    prune(pruneMode, cutoff, "/");
   }
 
   /**
    * Prune files, in batches. There's a sleep between each batch.
-   * @param modTime Oldest modification time to allow
+   *
+   * @param pruneMode The mode of operation for the prune For details see
+   *                  {@link MetadataStore#prune(PruneMode, long)}
+   * @param cutoff Oldest modification time to allow
    * @param keyPrefix The prefix for the keys that should be removed
    * @throws IOException Any IO/DDB failure.
    * @throws InterruptedIOException if the prune was interrupted
    */
   @Override
   @Retries.RetryTranslated
-  public void prune(long modTime, String keyPrefix) throws IOException {
+  public void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
+      throws IOException {
+    final ItemCollection<ScanOutcome> items =
+        expiredFiles(pruneMode, cutoff, keyPrefix);
+    innerPrune(items);
+  }
+
+  private void innerPrune(ItemCollection<ScanOutcome> items)
+      throws IOException {
     int itemCount = 0;
     try {
       Collection<Path> deletionBatch =
@@ -1064,7 +1115,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
           S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT,
           TimeUnit.MILLISECONDS);
       Set<Path> parentPathSet = new HashSet<>();
-      for (Item item : expiredFiles(modTime, keyPrefix)) {
+      for (Item item : items) {
         DDBPathMetadata md = PathMetadataDynamoDBTranslation
             .itemToPathMetadata(item, username);
         Path path = md.getFileStatus().getPath();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java
new file mode 100644
index 0000000..daee621
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+/**
+ * This interface is defined for handling TTL expiry of metadata in S3Guard.
+ *
+ * TTL can be tested by implementing this interface and setting is as
+ * {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any
+ * value preferred and flaky tests could be avoided. By default getNow()
+ * returns the EPOCH in runtime.
+ *
+ * Time is measured in milliseconds,
+ */
+public interface ITtlTimeProvider {
+  long getNow();
+  long getMetadataTtl();
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
index 9276388..6c13cd1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -112,32 +112,34 @@ public class LocalMetadataStore implements MetadataStore {
   }
 
   @Override
-  public void delete(Path p) throws IOException {
-    doDelete(p, false, true);
+  public void delete(Path p, ITtlTimeProvider ttlTimeProvider)
+      throws IOException {
+    doDelete(p, false, true, ttlTimeProvider);
   }
 
   @Override
   public void forgetMetadata(Path p) throws IOException {
-    doDelete(p, false, false);
+    doDelete(p, false, false, null);
   }
 
   @Override
-  public void deleteSubtree(Path path) throws IOException {
-    doDelete(path, true, true);
+  public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+      throws IOException {
+    doDelete(path, true, true, ttlTimeProvider);
   }
 
-  private synchronized void doDelete(Path p, boolean recursive, boolean
-      tombstone) {
+  private synchronized void doDelete(Path p, boolean recursive,
+      boolean tombstone, ITtlTimeProvider ttlTimeProvider) {
 
     Path path = standardize(p);
 
     // Delete entry from file cache, then from cached parent directory, if any
 
-    deleteCacheEntries(path, tombstone);
+    deleteCacheEntries(path, tombstone, ttlTimeProvider);
 
     if (recursive) {
       // Remove all entries that have this dir as path prefix.
-      deleteEntryByAncestor(path, localCache, tombstone);
+      deleteEntryByAncestor(path, localCache, tombstone, ttlTimeProvider);
     }
   }
 
@@ -191,7 +193,8 @@ public class LocalMetadataStore implements MetadataStore {
 
   @Override
   public void move(Collection<Path> pathsToDelete,
-      Collection<PathMetadata> pathsToCreate) throws IOException {
+      Collection<PathMetadata> pathsToCreate,
+      ITtlTimeProvider ttlTimeProvider) throws IOException {
     LOG.info("Move {} to {}", pathsToDelete, pathsToCreate);
 
     Preconditions.checkNotNull(pathsToDelete, "pathsToDelete is null");
@@ -205,7 +208,7 @@ public class LocalMetadataStore implements MetadataStore {
       // 1. Delete pathsToDelete
       for (Path meta : pathsToDelete) {
         LOG.debug("move: deleting metadata {}", meta);
-        delete(meta);
+        delete(meta, ttlTimeProvider);
       }
 
       // 2. Create new destination path metadata
@@ -332,18 +335,19 @@ public class LocalMetadataStore implements MetadataStore {
   }
 
   @Override
-  public void prune(long modTime) throws IOException{
-    prune(modTime, "");
+  public void prune(PruneMode pruneMode, long cutoff) throws IOException{
+    prune(pruneMode, cutoff, "");
   }
 
   @Override
-  public synchronized void prune(long modTime, String keyPrefix) {
+  public synchronized void prune(PruneMode pruneMode, long cutoff,
+      String keyPrefix) {
     // prune files
     // filter path_metadata (files), filter expired, remove expired
     localCache.asMap().entrySet().stream()
         .filter(entry -> entry.getValue().hasPathMeta())
-        .filter(entry -> expired(
-            entry.getValue().getFileMeta().getFileStatus(), modTime, keyPrefix))
+        .filter(entry -> expired(pruneMode,
+            entry.getValue().getFileMeta(), cutoff, keyPrefix))
         .forEach(entry -> localCache.invalidate(entry.getKey()));
 
 
@@ -358,28 +362,37 @@ public class LocalMetadataStore implements MetadataStore {
           Collection<PathMetadata> newChildren = new LinkedList<>();
 
           for (PathMetadata child : oldChildren) {
-            FileStatus status = child.getFileStatus();
-            if (!expired(status, modTime, keyPrefix)) {
+            if (!expired(pruneMode, child, cutoff, keyPrefix)) {
               newChildren.add(child);
             }
           }
-          if (newChildren.size() != oldChildren.size()) {
-            DirListingMetadata dlm =
-                new DirListingMetadata(path, newChildren, false);
-            localCache.put(path, new LocalMetadataEntry(dlm));
-            if (!path.isRoot()) {
-              DirListingMetadata parent = getDirListingMeta(path.getParent());
-              if (parent != null) {
-                parent.setAuthoritative(false);
-              }
-            }
-          }
+          removeAuthoritativeFromParent(path, oldChildren, newChildren);
         });
   }
 
-  private boolean expired(FileStatus status, long expiry, String keyPrefix) {
+  private void removeAuthoritativeFromParent(Path path,
+      Collection<PathMetadata> oldChildren,
+      Collection<PathMetadata> newChildren) {
+    if (newChildren.size() != oldChildren.size()) {
+      DirListingMetadata dlm =
+          new DirListingMetadata(path, newChildren, false);
+      localCache.put(path, new LocalMetadataEntry(dlm));
+      if (!path.isRoot()) {
+        DirListingMetadata parent = getDirListingMeta(path.getParent());
+        if (parent != null) {
+          parent.setAuthoritative(false);
+        }
+      }
+    }
+  }
+
+  private boolean expired(PruneMode pruneMode, PathMetadata metadata,
+      long cutoff, String keyPrefix) {
+    final S3AFileStatus status = metadata.getFileStatus();
+    final URI statusUri = status.getPath().toUri();
+
     // remove the protocol from path string to be able to compare
-    String bucket = status.getPath().toUri().getHost();
+    String bucket = statusUri.getHost();
     String statusTranslatedPath = "";
     if(bucket != null && !bucket.isEmpty()){
       // if there's a bucket, (well defined host in Uri) the pathToParentKey
@@ -389,18 +402,33 @@ public class LocalMetadataStore implements MetadataStore {
     } else {
       // if there's no bucket in the path the pathToParentKey will fail, so
       // this is the fallback to get the path from status
-      statusTranslatedPath = status.getPath().toUri().getPath();
+      statusTranslatedPath = statusUri.getPath();
+    }
+
+    boolean expired;
+    switch (pruneMode) {
+    case ALL_BY_MODTIME:
+      // Note: S3 doesn't track modification time on directories, so for
+      // consistency with the DynamoDB implementation we ignore that here
+      expired = status.getModificationTime() < cutoff && !status.isDirectory()
+          && statusTranslatedPath.startsWith(keyPrefix);
+      break;
+    case TOMBSTONES_BY_LASTUPDATED:
+      expired = metadata.getLastUpdated() < cutoff && metadata.isDeleted()
+          && statusTranslatedPath.startsWith(keyPrefix);
+      break;
+    default:
+      throw new UnsupportedOperationException("Unsupported prune mode: "
+          + pruneMode);
     }
 
-    // Note: S3 doesn't track modification time on directories, so for
-    // consistency with the DynamoDB implementation we ignore that here
-    return status.getModificationTime() < expiry && !status.isDirectory()
-      && statusTranslatedPath.startsWith(keyPrefix);
+    return expired;
   }
 
   @VisibleForTesting
   static void deleteEntryByAncestor(Path ancestor,
-      Cache<Path, LocalMetadataEntry> cache, boolean tombstone) {
+      Cache<Path, LocalMetadataEntry> cache, boolean tombstone,
+      ITtlTimeProvider ttlTimeProvider) {
 
     cache.asMap().entrySet().stream()
         .filter(entry -> isAncestorOf(ancestor, entry.getKey()))
@@ -410,7 +438,9 @@ public class LocalMetadataStore implements MetadataStore {
           if(meta.hasDirMeta()){
             cache.invalidate(path);
           } else if(tombstone && meta.hasPathMeta()){
-            meta.setPathMetadata(PathMetadata.tombstone(path));
+            final PathMetadata pmTombstone = PathMetadata.tombstone(path);
+            pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
+            meta.setPathMetadata(pmTombstone);
           } else {
             cache.invalidate(path);
           }
@@ -434,7 +464,8 @@ public class LocalMetadataStore implements MetadataStore {
    * Update fileCache and dirCache to reflect deletion of file 'f'.  Call with
    * lock held.
    */
-  private void deleteCacheEntries(Path path, boolean tombstone) {
+  private void deleteCacheEntries(Path path, boolean tombstone,
+      ITtlTimeProvider ttlTimeProvider) {
     LocalMetadataEntry entry = localCache.getIfPresent(path);
     // If there's no entry, delete should silently succeed
     // (based on MetadataStoreTestBase#testDeleteNonExisting)
@@ -448,6 +479,7 @@ public class LocalMetadataStore implements MetadataStore {
     if(entry.hasPathMeta()){
       if (tombstone) {
         PathMetadata pmd = PathMetadata.tombstone(path);
+        pmd.setLastUpdated(ttlTimeProvider.getNow());
         entry.setPathMetadata(pmd);
       } else {
         entry.setPathMetadata(null);
@@ -474,6 +506,7 @@ public class LocalMetadataStore implements MetadataStore {
         LOG.debug("removing parent's entry for {} ", path);
         if (tombstone) {
           dir.markDeleted(path);
+          dir.setLastUpdated(ttlTimeProvider.getNow());
         } else {
           dir.remove(path);
         }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
index 746fd82..7875d43 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
@@ -63,16 +63,23 @@ public interface MetadataStore extends Closeable {
    * Deletes exactly one path, leaving a tombstone to prevent lingering,
    * inconsistent copies of it from being listed.
    *
+   * Deleting an entry with a tombstone needs a
+   * {@link org.apache.hadoop.fs.s3a.s3guard.S3Guard.TtlTimeProvider} because
+   * the lastUpdated field of the record has to be updated to <pre>now</pre>.
+   *
    * @param path the path to delete
+   * @param ttlTimeProvider the time provider to set last_updated. Must not
+   *                        be null.
    * @throws IOException if there is an error
    */
-  void delete(Path path) throws IOException;
+  void delete(Path path, ITtlTimeProvider ttlTimeProvider)
+      throws IOException;
 
   /**
    * Removes the record of exactly one path.  Does not leave a tombstone (see
-   * {@link MetadataStore#delete(Path)}. It is currently intended for testing
-   * only, and a need to use it as part of normal FileSystem usage is not
-   * anticipated.
+   * {@link MetadataStore#delete(Path, ITtlTimeProvider)}. It is currently
+   * intended for testing only, and a need to use it as part of normal
+   * FileSystem usage is not anticipated.
    *
    * @param path the path to delete
    * @throws IOException if there is an error
@@ -88,10 +95,17 @@ public interface MetadataStore extends Closeable {
    * implementations must also update any stored {@code DirListingMetadata}
    * objects which track the parent of this file.
    *
+   * Deleting a subtree with a tombstone needs a
+   * {@link org.apache.hadoop.fs.s3a.s3guard.S3Guard.TtlTimeProvider} because
+   * the lastUpdated field of all records have to be updated to <pre>now</pre>.
+   *
    * @param path the root of the sub-tree to delete
+   * @param ttlTimeProvider the time provider to set last_updated. Must not
+   *                        be null.
    * @throws IOException if there is an error
    */
-  void deleteSubtree(Path path) throws IOException;
+  void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+      throws IOException;
 
   /**
    * Gets metadata for a path.
@@ -151,10 +165,13 @@ public interface MetadataStore extends Closeable {
    * @param pathsToCreate Collection of all PathMetadata for the new paths
    *                      that were created at the destination of the rename
    *                      ().
+   * @param ttlTimeProvider the time provider to set last_updated. Must not
+   *                        be null.
    * @throws IOException if there is an error
    */
   void move(Collection<Path> pathsToDelete,
-      Collection<PathMetadata> pathsToCreate) throws IOException;
+      Collection<PathMetadata> pathsToCreate,
+      ITtlTimeProvider ttlTimeProvider) throws IOException;
 
   /**
    * Saves metadata for exactly one path.
@@ -212,29 +229,54 @@ public interface MetadataStore extends Closeable {
   void destroy() throws IOException;
 
   /**
-   * Clear any metadata older than a specified time from the repository.
-   * Implementations MUST clear file metadata, and MAY clear directory metadata
-   * (s3a itself does not track modification time for directories).
-   * Implementations may also choose to throw UnsupportedOperationException
-   * istead. Note that modification times should be in UTC, as returned by
-   * System.currentTimeMillis at the time of modification.
+   * Prune method with two modes of operation:
+   * <ul>
+   *   <li>
+   *    {@link PruneMode#ALL_BY_MODTIME}
+   *    Clear any metadata older than a specified mod_time from the store.
+   *    Note that this modification time is the S3 modification time from the
+   *    object's metadata - from the object store.
+   *    Implementations MUST clear file metadata, and MAY clear directory
+   *    metadata (s3a itself does not track modification time for directories).
+   *    Implementations may also choose to throw UnsupportedOperationException
+   *    instead. Note that modification times must be in UTC, as returned by
+   *    System.currentTimeMillis at the time of modification.
+   *   </li>
+   * </ul>
    *
-   * @param modTime Oldest modification time to allow
+   * <ul>
+   *   <li>
+   *    {@link PruneMode#TOMBSTONES_BY_LASTUPDATED}
+   *    Clear any tombstone updated earlier than a specified time from the
+   *    store. Note that this last_updated is the time when the metadata
+   *    entry was last updated and maintained by the metadata store.
+   *    Implementations MUST clear file metadata, and MAY clear directory
+   *    metadata (s3a itself does not track modification time for directories).
+   *    Implementations may also choose to throw UnsupportedOperationException
+   *    instead. Note that last_updated must be in UTC, as returned by
+   *    System.currentTimeMillis at the time of modification.
+   *   </li>
+   * </ul>
+   *
+   * @param pruneMode
+   * @param cutoff Oldest time to allow (UTC)
    * @throws IOException if there is an error
    * @throws UnsupportedOperationException if not implemented
    */
-  void prune(long modTime) throws IOException, UnsupportedOperationException;
+  void prune(PruneMode pruneMode, long cutoff) throws IOException,
+      UnsupportedOperationException;
 
   /**
-   * Same as {@link MetadataStore#prune(long)}, but with an additional
-   * keyPrefix parameter to filter the pruned keys with a prefix.
+   * Same as {@link MetadataStore#prune(PruneMode, long)}, but with an
+   * additional keyPrefix parameter to filter the pruned keys with a prefix.
    *
-   * @param modTime Oldest modification time to allow
+   * @param pruneMode
+   * @param cutoff Oldest time to allow (UTC)
    * @param keyPrefix The prefix for the keys that should be removed
    * @throws IOException if there is an error
    * @throws UnsupportedOperationException if not implemented
    */
-  void prune(long modTime, String keyPrefix)
+  void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
       throws IOException, UnsupportedOperationException;
 
   /**
@@ -252,4 +294,13 @@ public interface MetadataStore extends Closeable {
    * @throws IOException if there is an error
    */
   void updateParameters(Map<String, String> parameters) throws IOException;
+
+  /**
+   * Modes of operation for prune.
+   * For details see {@link MetadataStore#prune(PruneMode, long)}
+   */
+  enum PruneMode {
+    ALL_BY_MODTIME,
+    TOMBSTONES_BY_LASTUPDATED
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
index 04704e7..1472ef1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
@@ -47,7 +47,8 @@ public class NullMetadataStore implements MetadataStore {
   }
 
   @Override
-  public void delete(Path path) throws IOException {
+  public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
+      throws IOException {
   }
 
   @Override
@@ -55,7 +56,8 @@ public class NullMetadataStore implements MetadataStore {
   }
 
   @Override
-  public void deleteSubtree(Path path) throws IOException {
+  public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+      throws IOException {
   }
 
   @Override
@@ -76,7 +78,8 @@ public class NullMetadataStore implements MetadataStore {
 
   @Override
   public void move(Collection<Path> pathsToDelete,
-      Collection<PathMetadata> pathsToCreate) throws IOException {
+      Collection<PathMetadata> pathsToCreate,
+      ITtlTimeProvider ttlTimeProvider) throws IOException {
   }
 
   @Override
@@ -96,11 +99,11 @@ public class NullMetadataStore implements MetadataStore {
   }
 
   @Override
-  public void prune(long modTime) {
+  public void prune(PruneMode pruneMode, long cutoff) {
   }
 
   @Override
-  public void prune(long modTime, String keyPrefix) {
+  public void prune(PruneMode pruneMode, long cutoff, String keyPrefix) {
   }
 
   @Override
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index 26c75e8..933a01c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -25,9 +25,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import javax.annotation.Nullable;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
@@ -46,6 +50,8 @@ import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.fs.s3a.Tristate;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
 import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
 import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_LATENCY;
 import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_REQUEST;
@@ -142,15 +148,17 @@ public final class S3Guard {
    * @param ms MetadataStore to {@code put()} into.
    * @param status status to store
    * @param instrumentation instrumentation of the s3a file system
+   * @param timeProvider Time provider to use when writing entries
    * @return The same status as passed in
    * @throws IOException if metadata store update failed
    */
   @RetryTranslated
   public static S3AFileStatus putAndReturn(MetadataStore ms,
       S3AFileStatus status,
-      S3AInstrumentation instrumentation) throws IOException {
+      S3AInstrumentation instrumentation,
+      ITtlTimeProvider timeProvider) throws IOException {
     long startTimeNano = System.nanoTime();
-    ms.put(new PathMetadata(status));
+    S3Guard.putWithTtl(ms, new PathMetadata(status), timeProvider);
     instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
         (System.nanoTime() - startTimeNano));
     instrumentation.incrementCounter(S3GUARD_METADATASTORE_PUT_PATH_REQUEST, 1);
@@ -196,7 +204,7 @@ public final class S3Guard {
    * @param backingStatuses Directory listing from the backing store.
    * @param dirMeta  Directory listing from MetadataStore.  May be null.
    * @param isAuthoritative State of authoritative mode
-   * @param timeProvider Time provider for testing.
+   * @param timeProvider Time provider to use when updating entries
    * @return Final result of directory listing.
    * @throws IOException if metadata store update failed
    */
@@ -242,7 +250,7 @@ public final class S3Guard {
         if (status != null
             && s.getModificationTime() > status.getModificationTime()) {
           LOG.debug("Update ms with newer metadata of: {}", status);
-          ms.put(new PathMetadata(s));
+          S3Guard.putWithTtl(ms, new PathMetadata(s), timeProvider);
         }
       }
 
@@ -357,7 +365,7 @@ public final class S3Guard {
       }
 
       // Batched put
-      ms.put(pathMetas);
+      S3Guard.putWithTtl(ms, pathMetas, timeProvider);
     } catch (IOException ioe) {
       LOG.error("MetadataStore#put() failure:", ioe);
     }
@@ -462,7 +470,8 @@ public final class S3Guard {
   }
 
   public static void addAncestors(MetadataStore metadataStore,
-      Path qualifiedPath, String username) throws IOException {
+      Path qualifiedPath, String username, ITtlTimeProvider timeProvider)
+      throws IOException {
     Collection<PathMetadata> newDirs = new ArrayList<>();
     Path parent = qualifiedPath.getParent();
     while (!parent.isRoot()) {
@@ -476,7 +485,7 @@ public final class S3Guard {
       }
       parent = parent.getParent();
     }
-    metadataStore.put(newDirs);
+    S3Guard.putWithTtl(metadataStore, newDirs, timeProvider);
   }
 
   private static void addMoveStatus(Collection<Path> srcPaths,
@@ -514,17 +523,6 @@ public final class S3Guard {
   }
 
   /**
-   * This interface is defined for testing purposes.
-   * TTL can be tested by implementing this interface and setting is as
-   * {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any
-   * value preferred and flaky tests could be avoided.
-   */
-  public interface ITtlTimeProvider {
-    long getNow();
-    long getAuthoritativeDirTtl();
-  }
-
-  /**
    * Runtime implementation for TTL Time Provider interface.
    */
   public static class TtlTimeProvider implements ITtlTimeProvider {
@@ -534,34 +532,127 @@ public final class S3Guard {
       this.authoritativeDirTtl = authoritativeDirTtl;
     }
 
+    public TtlTimeProvider(Configuration conf) {
+      this.authoritativeDirTtl =
+          conf.getTimeDuration(METADATASTORE_METADATA_TTL,
+              DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public long getNow() {
       return System.currentTimeMillis();
     }
 
-    @Override public long getAuthoritativeDirTtl() {
+    @Override public long getMetadataTtl() {
       return authoritativeDirTtl;
     }
+
+    @Override
+    public boolean equals(final Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      final TtlTimeProvider that = (TtlTimeProvider) o;
+      return authoritativeDirTtl == that.authoritativeDirTtl;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(authoritativeDirTtl);
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "TtlTimeProvider{");
+      sb.append("authoritativeDirTtl=").append(authoritativeDirTtl);
+      sb.append(" millis}");
+      return sb.toString();
+    }
   }
 
   public static void putWithTtl(MetadataStore ms, DirListingMetadata dirMeta,
       ITtlTimeProvider timeProvider)
       throws IOException {
     dirMeta.setLastUpdated(timeProvider.getNow());
+    dirMeta.getListing()
+        .forEach(pm -> pm.setLastUpdated(timeProvider.getNow()));
     ms.put(dirMeta);
   }
 
-  public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
-      Path path, ITtlTimeProvider timeProvider)
+  public static void putWithTtl(MetadataStore ms, PathMetadata fileMeta,
+      @Nullable ITtlTimeProvider timeProvider) throws IOException {
+    if (timeProvider != null) {
+      fileMeta.setLastUpdated(timeProvider.getNow());
+    } else {
+      LOG.debug("timeProvider is null, put {} without setting last_updated",
+          fileMeta);
+    }
+    ms.put(fileMeta);
+  }
+
+  public static void putWithTtl(MetadataStore ms,
+      Collection<PathMetadata> fileMetas,
+      @Nullable ITtlTimeProvider timeProvider)
       throws IOException {
-    long ttl = timeProvider.getAuthoritativeDirTtl();
+    if (timeProvider != null) {
+      final long now = timeProvider.getNow();
+      fileMetas.forEach(fileMeta -> fileMeta.setLastUpdated(now));
+    } else {
+      LOG.debug("timeProvider is null, put {} without setting last_updated",
+          fileMetas);
+    }
+    ms.put(fileMetas);
+  }
+
+  public static PathMetadata getWithTtl(MetadataStore ms, Path path,
+      @Nullable ITtlTimeProvider timeProvider) throws IOException {
+    final PathMetadata pathMetadata = ms.get(path);
+    // if timeProvider is null let's return with what the ms has
+    if (timeProvider == null) {
+      LOG.debug("timeProvider is null, returning pathMetadata as is");
+      return pathMetadata;
+    }
+
+    long ttl = timeProvider.getMetadataTtl();
+
+    if (pathMetadata != null) {
+      // Special case: the pathmetadata's last updated is 0. This can happen
+      // eg. with an old db using this implementation
+      if (pathMetadata.getLastUpdated() == 0) {
+        LOG.debug("PathMetadata TTL for {} is 0, so it will be returned as "
+            + "not expired.");
+        return pathMetadata;
+      }
+
+      if (!pathMetadata.isExpired(ttl, timeProvider.getNow())) {
+        return pathMetadata;
+      } else {
+        LOG.debug("PathMetadata TTl for {} is expired in metadata store.",
+            path);
+        return null;
+      }
+    }
 
+    return null;
+  }
+
+  public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
+      Path path, @Nullable ITtlTimeProvider timeProvider)
+      throws IOException {
     DirListingMetadata dlm = ms.listChildren(path);
 
-    if(dlm != null && dlm.isAuthoritative()
+    if (timeProvider == null) {
+      LOG.debug("timeProvider is null, returning DirListingMetadata as is");
+      return dlm;
+    }
+
+    long ttl = timeProvider.getMetadataTtl();
+
+    if (dlm != null && dlm.isAuthoritative()
         && dlm.isExpired(ttl, timeProvider.getNow())) {
       dlm.setAuthoritative(false);
     }
     return dlm;
   }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 397a9cb..dedb849 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -707,7 +707,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
         }
         S3AFileStatus dir = DynamoDBMetadataStore.makeDirStatus(parent,
             f.getOwner());
-        getStore().put(new PathMetadata(dir));
+        S3Guard.putWithTtl(getStore(), new PathMetadata(dir),
+            getFilesystem().getTtlTimeProvider());
         dirCache.add(parent);
         parent = parent.getParent();
       }
@@ -741,7 +742,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
               located.getVersionId());
         }
         putParentsIfNotPresent(child);
-        getStore().put(new PathMetadata(child));
+        S3Guard.putWithTtl(getStore(), new PathMetadata(child),
+            getFilesystem().getTtlTimeProvider());
         items++;
       }
       return items;
@@ -1073,7 +1075,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
       }
 
       try {
-        getStore().prune(divide, keyPrefix);
+        getStore().prune(MetadataStore.PruneMode.ALL_BY_MODTIME, divide,
+            keyPrefix);
       } catch (UnsupportedOperationException e){
         errorln("Prune operation not supported in metadata store.");
       }
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
index 94dc89b..337fc95 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
@@ -181,8 +181,8 @@ removed on `S3AFileSystem` level.
 
 ```xml
 <property>
-    <name>fs.s3a.metadatastore.authoritative.dir.ttl</name>
-    <value>3600000</value>
+    <name>fs.s3a.metadatastore.metadata.ttl</name>
+    <value>15m</value>
 </property>
 ```
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
index 6dbe6f9..2af9a0a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import org.junit.Assume;
@@ -37,20 +38,32 @@ import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
 import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
-
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.test.LambdaTestUtils.eventually;
 import static org.junit.Assume.assumeTrue;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
 import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
 import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  *
@@ -115,7 +128,7 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
    * Test array for parameterized test runs.
    * @return a list of parameter tuples.
    */
-  @Parameterized.Parameters
+  @Parameterized.Parameters(name="auth={0}")
   public static Collection<Object[]> params() {
     return Arrays.asList(new Object[][]{
         {true}, {false}
@@ -190,8 +203,11 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
     URI uri = testFS.getUri();
 
     removeBaseAndBucketOverrides(uri.getHost(), config,
-        METADATASTORE_AUTHORITATIVE);
+        METADATASTORE_AUTHORITATIVE,
+        METADATASTORE_METADATA_TTL);
     config.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMode);
+    config.setLong(METADATASTORE_METADATA_TTL,
+        DEFAULT_METADATASTORE_METADATA_TTL);
     final S3AFileSystem gFs = createFS(uri, config);
     // set back the same metadata store instance
     gFs.setMetadataStore(realMs);
@@ -272,6 +288,292 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
   }
 
   /**
+   * Tests that tombstone expiry is implemented, so if a file is created raw
+   * while the tombstone exist in ms for with the same name then S3Guard will
+   * check S3 for the file.
+   *
+   * Seq: create guarded; delete guarded; create raw (same path); read guarded;
+   * This will fail if no tombstone expiry is set
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testTombstoneExpiryGuardedDeleteRawCreate() throws Exception {
+    boolean allowAuthoritative = authoritative;
+    Path testFilePath = path("TEGDRC-" + UUID.randomUUID() + "/file");
+    LOG.info("Allow authoritative param: {}",  allowAuthoritative);
+    String originalText = "some test";
+    String newText = "the new originalText for test";
+
+    final ITtlTimeProvider originalTimeProvider =
+        guardedFs.getTtlTimeProvider();
+    try {
+      final AtomicLong now = new AtomicLong(1);
+      final AtomicLong metadataTtl = new AtomicLong(1);
+
+      // SET TTL TIME PROVIDER FOR TESTING
+      ITtlTimeProvider testTimeProvider =
+          new ITtlTimeProvider() {
+            @Override public long getNow() {
+              return now.get();
+            }
+
+            @Override public long getMetadataTtl() {
+              return metadataTtl.get();
+            }
+          };
+      guardedFs.setTtlTimeProvider(testTimeProvider);
+
+      // CREATE GUARDED
+      createAndAwaitFs(guardedFs, testFilePath, originalText);
+
+      // DELETE GUARDED
+      deleteGuardedTombstoned(guardedFs, testFilePath, now);
+
+      // CREATE RAW
+      createAndAwaitFs(rawFS, testFilePath, newText);
+
+      // CHECK LISTING - THE FILE SHOULD NOT BE THERE, EVEN IF IT'S CREATED RAW
+      checkListingDoesNotContainPath(guardedFs, testFilePath);
+
+      // CHANGE TTL SO ENTRY (& TOMBSTONE METADATA) WILL EXPIRE
+      long willExpire = now.get() + metadataTtl.get() + 1L;
+      now.set(willExpire);
+      LOG.info("willExpire: {}, ttlNow: {}; ttlTTL: {}", willExpire,
+          testTimeProvider.getNow(), testTimeProvider.getMetadataTtl());
+
+      // READ GUARDED
+      String newRead = readBytesToString(guardedFs, testFilePath,
+          newText.length());
+
+      // CHECK LISTING - THE FILE SHOULD BE THERE, TOMBSTONE EXPIRED
+      checkListingContainsPath(guardedFs, testFilePath);
+
+      // we can assert that the originalText is the new one, which created raw
+      LOG.info("Old: {}, New: {}, Read: {}", originalText, newText, newRead);
+      assertEquals("The text should be modified with a new.", newText,
+          newRead);
+    } finally {
+      guardedFs.delete(testFilePath, true);
+      guardedFs.setTtlTimeProvider(originalTimeProvider);
+    }
+  }
+
+  private void createAndAwaitFs(S3AFileSystem fs, Path testFilePath,
+      String text) throws Exception {
+    writeTextFile(fs, testFilePath, text, true);
+    final FileStatus newStatus = awaitFileStatus(fs, testFilePath);
+    assertNotNull("Newly created file status should not be null.", newStatus);
+  }
+
+  private void deleteGuardedTombstoned(S3AFileSystem guarded,
+      Path testFilePath, AtomicLong now) throws Exception {
+    guarded.delete(testFilePath, true);
+
+    final PathMetadata metadata =
+        guarded.getMetadataStore().get(testFilePath);
+    assertNotNull("Created file metadata should not be null in ms",
+        metadata);
+    assertEquals("Created file metadata last_updated should equal with "
+            + "mocked now", now.get(), metadata.getLastUpdated());
+
+    intercept(FileNotFoundException.class, testFilePath.toString(),
+        "This file should throw FNFE when reading through "
+            + "the guarded fs, and the metadatastore tombstoned the file.",
+        () -> guarded.getFileStatus(testFilePath));
+  }
+
+  /**
+   * createNonRecursive must fail if the parent directory has been deleted,
+   * and succeed if the tombstone has expired and the directory has been
+   * created out of band.
+   */
+  @Test
+  public void testCreateNonRecursiveFailsIfParentDeleted() throws Exception {
+    LOG.info("Authoritative mode: {}", authoritative);
+
+    String dirToDelete = methodName + UUID.randomUUID().toString();
+    String fileToTry = dirToDelete + "/theFileToTry";
+
+    final Path dirPath = path(dirToDelete);
+    final Path filePath = path(fileToTry);
+
+    // Create a directory with
+    ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+    ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
+
+    try {
+      guardedFs.setTtlTimeProvider(mockTimeProvider);
+      when(mockTimeProvider.getNow()).thenReturn(100L);
+      when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
+
+      // CREATE DIRECTORY
+      guardedFs.mkdirs(dirPath);
+
+      // DELETE DIRECTORY
+      guardedFs.delete(dirPath, true);
+
+      // WRITE TO DELETED DIRECTORY - FAIL
+      intercept(FileNotFoundException.class,
+          dirToDelete,
+          "createNonRecursive must fail if the parent directory has been deleted.",
+          () -> createNonRecursive(guardedFs, filePath));
+
+      // CREATE THE DIRECTORY RAW
+      rawFS.mkdirs(dirPath);
+      awaitFileStatus(rawFS, dirPath);
+
+      // SET TIME SO METADATA EXPIRES
+      when(mockTimeProvider.getNow()).thenReturn(110L);
+
+      // WRITE TO DELETED DIRECTORY - SUCCESS
+      createNonRecursive(guardedFs, filePath);
+
+    } finally {
+      guardedFs.delete(filePath, true);
+      guardedFs.delete(dirPath, true);
+      guardedFs.setTtlTimeProvider(originalTimeProvider);
+    }
+  }
+
+  /**
+   * When lastUpdated = 0 the entry should not expire. This is a special case
+   * eg. for old metadata entries
+   */
+  @Test
+  public void testLastUpdatedZeroWontExpire() throws Exception {
+    LOG.info("Authoritative mode: {}", authoritative);
+
+    String testFile = methodName + UUID.randomUUID().toString() +
+        "/theFileToTry";
+
+    long ttl = 10L;
+    final Path filePath = path(testFile);
+
+    ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+    ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
+
+    try {
+      guardedFs.setTtlTimeProvider(mockTimeProvider);
+      when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl);
+
+      // create a file while the NOW is 0, so it will set 0 as the last_updated
+      when(mockTimeProvider.getNow()).thenReturn(0L);
+      touch(guardedFs, filePath);
+      deleteFile(guardedFs, filePath);
+
+      final PathMetadata pathMetadata =
+          guardedFs.getMetadataStore().get(filePath);
+      assertNotNull("pathMetadata should not be null after deleting with "
+          + "tombstones", pathMetadata);
+      assertEquals("pathMetadata lastUpdated field should be 0", 0,
+          pathMetadata.getLastUpdated());
+
+      // set the time, so the metadata would expire
+      when(mockTimeProvider.getNow()).thenReturn(2*ttl);
+      intercept(FileNotFoundException.class, filePath.toString(),
+          "This file should throw FNFE when reading through "
+              + "the guarded fs, and the metadatastore tombstoned the file. "
+              + "The tombstone won't expire if lastUpdated is set to 0.",
+          () -> guardedFs.getFileStatus(filePath));
+
+    } finally {
+      guardedFs.delete(filePath, true);
+      guardedFs.setTtlTimeProvider(originalTimeProvider);
+    }
+  }
+
+  /**
+   * 1. File is deleted in the guarded fs.
+   * 2. File is replaced in the raw fs.
+   * 3. File is deleted in the guarded FS after the expiry time.
+   * 4. File MUST NOT exist in raw FS.
+   */
+  @Test
+  public void deleteAfterTombstoneExpiryOobCreate() throws Exception {
+    LOG.info("Authoritative mode: {}", authoritative);
+
+    String testFile = methodName + UUID.randomUUID().toString() +
+        "/theFileToTry";
+
+    long ttl = 10L;
+    final Path filePath = path(testFile);
+
+    ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+    ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
+
+    try {
+      guardedFs.setTtlTimeProvider(mockTimeProvider);
+      when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl);
+
+      // CREATE AND DELETE WITH GUARDED FS
+      when(mockTimeProvider.getNow()).thenReturn(100L);
+      touch(guardedFs, filePath);
+      deleteFile(guardedFs, filePath);
+
+      final PathMetadata pathMetadata =
+          guardedFs.getMetadataStore().get(filePath);
+      assertNotNull("pathMetadata should not be null after deleting with "
+          + "tombstones", pathMetadata);
+
+      // REPLACE WITH RAW FS
+      touch(rawFS, filePath);
+      awaitFileStatus(rawFS, filePath);
+
+      // SET EXPIRY TIME, SO THE TOMBSTONE IS EXPIRED
+      when(mockTimeProvider.getNow()).thenReturn(100L + 2 * ttl);
+
+      // DELETE IN GUARDED FS
+      guardedFs.delete(filePath, true);
+
+      // FILE MUST NOT EXIST IN RAW
+      intercept(FileNotFoundException.class, filePath.toString(),
+          "This file should throw FNFE when reading through "
+              + "the raw fs, and the guarded fs deleted the file.",
+          () -> rawFS.getFileStatus(filePath));
+
+    } finally {
+      guardedFs.delete(filePath, true);
+      guardedFs.setTtlTimeProvider(originalTimeProvider);
+    }
+  }
+
+  private void checkListingDoesNotContainPath(S3AFileSystem fs, Path filePath)
+      throws IOException {
+    final RemoteIterator<LocatedFileStatus> listIter =
+        fs.listFiles(filePath.getParent(), false);
+    while (listIter.hasNext()) {
+      final LocatedFileStatus lfs = listIter.next();
+      assertNotEquals("The tombstone has not been expired, so must not be"
+          + " listed.", filePath, lfs.getPath());
+    }
+    LOG.info("{}; file omitted from listFiles listing as expected.", filePath);
+
+    final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
+    for (FileStatus fileStatus : fileStatuses) {
+      assertNotEquals("The tombstone has not been expired, so must not be"
+          + " listed.", filePath, fileStatus.getPath());
+    }
+    LOG.info("{}; file omitted from listStatus as expected.", filePath);
+  }
+
+  private void checkListingContainsPath(S3AFileSystem fs, Path filePath)
+      throws IOException {
+    final RemoteIterator<LocatedFileStatus> listIter =
+        fs.listFiles(filePath.getParent(), false);
+
+    while (listIter.hasNext()) {
+      final LocatedFileStatus lfs = listIter.next();
+      assertEquals(filePath, lfs.getPath());
+    }
+
+    final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
+    for (FileStatus fileStatus : fileStatuses)
+      assertEquals("The file should be listed in fs.listStatus",
+          filePath, fileStatus.getPath());
+  }
+
+  /**
    * Perform an out-of-band delete.
    * @param testFilePath filename
    * @param allowAuthoritative  is the store authoritative
@@ -384,12 +686,18 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
       // Create initial statusIterator with guarded ms
       writeTextFile(guardedFs, testFilePath, firstText, true);
       // and cache the value for later
-      final FileStatus origStatus = awaitFileStatus(rawFS, testFilePath);
+      final S3AFileStatus origStatus = awaitFileStatus(rawFS, testFilePath);
+      assertNotNull("No etag in raw status " + origStatus,
+          origStatus.getETag());
 
       // Do a listing to cache the lists. Should be authoritative if it's set.
-      final FileStatus[] origList = guardedFs.listStatus(testDirPath);
+      final S3AFileStatus[] origList = (S3AFileStatus[]) guardedFs.listStatus(
+          testDirPath);
       assertArraySize("Added one file to the new dir, so the number of "
               + "files in the dir should be one.", 1, origList);
+      S3AFileStatus origGuardedFileStatus = origList[0];
+      assertNotNull("No etag in origGuardedFileStatus" + origGuardedFileStatus,
+          origGuardedFileStatus.getETag());
       final DirListingMetadata dirListingMetadata =
           realMs.listChildren(guardedFs.qualify(testDirPath));
       assertListingAuthority(allowAuthoritative, dirListingMetadata);
@@ -406,7 +714,8 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
       final FileStatus rawFileStatus = awaitFileStatus(rawFS, testFilePath);
 
       // check listing in guarded store.
-      final FileStatus[] modList = guardedFs.listStatus(testDirPath);
+      final S3AFileStatus[] modList = (S3AFileStatus[]) guardedFs.listStatus(
+          testDirPath);
       assertArraySize("Added one file to the new dir then modified it, "
           + "so the number of files in the dir should be one.", 1,
           modList);
@@ -479,6 +788,24 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
             expectedLength, guardedLength);
       }
     }
+    // check etag. This relies on first and second text being different.
+    final S3AFileStatus rawS3AFileStatus = (S3AFileStatus) rawFileStatus;
+    final S3AFileStatus guardedS3AFileStatus = (S3AFileStatus)
+        guardedFileStatus;
+    final S3AFileStatus origS3AFileStatus = (S3AFileStatus) origStatus;
+    assertNotEquals(
+        "raw status still no to date with changes" + stats,
+        origS3AFileStatus.getETag(), rawS3AFileStatus.getETag());
+    if (allowAuthoritative) {
+      // expect the etag to be out of sync
+      assertNotEquals(
+          "etag in authoritative table with " + stats,
+          rawS3AFileStatus.getETag(), guardedS3AFileStatus.getETag());
+    } else {
+      assertEquals(
+          "etag in non-authoritative table with " + stats,
+          rawS3AFileStatus.getETag(), guardedS3AFileStatus.getETag());
+    }
     // Next: modification time.
     long rawModTime = rawFileStatus.getModificationTime();
     long guardedModTime = guardedFileStatus.getModificationTime();
@@ -631,12 +958,18 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
    * @return the file status.
    * @throws Exception failure
    */
-  private FileStatus awaitFileStatus(S3AFileSystem fs,
+  private S3AFileStatus awaitFileStatus(S3AFileSystem fs,
       final Path testFilePath)
       throws Exception {
-    return eventually(
+    return (S3AFileStatus) eventually(
         STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
         () -> fs.getFileStatus(testFilePath));
   }
 
+  private FSDataOutputStream createNonRecursive(FileSystem fs, Path path)
+      throws Exception {
+    return fs
+        .createNonRecursive(path, false, 4096, (short) 3, (short) 4096, null);
+  }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
index d24009c..9622322 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
@@ -18,13 +18,22 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
+
 import org.junit.Assume;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
@@ -36,8 +45,37 @@ import static org.mockito.Mockito.when;
 /**
  * These tests are testing the S3Guard TTL (time to live) features.
  */
+@RunWith(Parameterized.class)
 public class ITestS3GuardTtl extends AbstractS3ATestBase {
 
+  private final boolean authoritative;
+
+  /**
+   * Test array for parameterized test runs.
+   * @return a list of parameter tuples.
+   */
+  @Parameterized.Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {true}, {false}
+    });
+  }
+
+  /**
+   * By changing the method name, the thread name is changed and
+   * so you can see in the logs which mode is being tested.
+   * @return a string to use for the thread namer.
+   */
+  @Override
+  protected String getMethodName() {
+    return super.getMethodName() +
+        (authoritative ? "-auth" : "-nonauth");
+  }
+
+  public ITestS3GuardTtl(boolean authoritative) {
+    this.authoritative = authoritative;
+  }
+
   /**
    * Patch the configuration - this test needs disabled filesystem caching.
    * These tests modify the fs instance that would cause flaky tests.
@@ -47,11 +85,15 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
   protected Configuration createConfiguration() {
     Configuration configuration = super.createConfiguration();
     S3ATestUtils.disableFilesystemCaching(configuration);
-    return S3ATestUtils.prepareTestConfiguration(configuration);
+    configuration =
+        S3ATestUtils.prepareTestConfiguration(configuration);
+    configuration.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
+    return configuration;
   }
 
   @Test
   public void testDirectoryListingAuthoritativeTtl() throws Exception {
+    LOG.info("Authoritative mode: {}", authoritative);
 
     final S3AFileSystem fs = getFileSystem();
     Assume.assumeTrue(fs.hasMetadataStore());
@@ -64,12 +106,12 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
     Assume.assumeTrue("MetadataStore should be authoritative for this test",
         isMetadataStoreAuthoritative(getFileSystem().getConf()));
 
-    S3Guard.ITtlTimeProvider mockTimeProvider =
-        mock(S3Guard.ITtlTimeProvider.class);
-    S3Guard.ITtlTimeProvider restoreTimeProvider = fs.getTtlTimeProvider();
+    ITtlTimeProvider mockTimeProvider =
+        mock(ITtlTimeProvider.class);
+    ITtlTimeProvider restoreTimeProvider = fs.getTtlTimeProvider();
     fs.setTtlTimeProvider(mockTimeProvider);
     when(mockTimeProvider.getNow()).thenReturn(100L);
-    when(mockTimeProvider.getAuthoritativeDirTtl()).thenReturn(1L);
+    when(mockTimeProvider.getMetadataTtl()).thenReturn(1L);
 
     Path dir = path("ttl/");
     Path file = path("ttl/afile");
@@ -102,4 +144,146 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
       fs.setTtlTimeProvider(restoreTimeProvider);
     }
   }
+
+  @Test
+  public void testFileMetadataExpiresTtl() throws Exception {
+    LOG.info("Authoritative mode: {}", authoritative);
+
+    Path fileExpire1 = path("expirettl-" + UUID.randomUUID());
+    Path fileExpire2 = path("expirettl-" + UUID.randomUUID());
+    Path fileRetain = path("expirettl-" + UUID.randomUUID());
+
+    final S3AFileSystem fs = getFileSystem();
+    Assume.assumeTrue(fs.hasMetadataStore());
+    final MetadataStore ms = fs.getMetadataStore();
+
+    ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+    ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider();
+
+    try {
+      fs.setTtlTimeProvider(mockTimeProvider);
+      when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
+
+      // set the time, so the fileExpire1 will expire
+      when(mockTimeProvider.getNow()).thenReturn(100L);
+      touch(fs, fileExpire1);
+      // set the time, so fileExpire2 will expire
+      when(mockTimeProvider.getNow()).thenReturn(101L);
+      touch(fs, fileExpire2);
+      // set the time, so fileRetain won't expire
+      when(mockTimeProvider.getNow()).thenReturn(109L);
+      touch(fs, fileRetain);
+      final FileStatus origFileRetainStatus = fs.getFileStatus(fileRetain);
+      // change time, so the first two file metadata is expired
+      when(mockTimeProvider.getNow()).thenReturn(110L);
+
+      // metadata is expired so this should refresh the metadata with
+      // last_updated to the getNow()
+      final FileStatus fileExpire1Status = fs.getFileStatus(fileExpire1);
+      assertNotNull(fileExpire1Status);
+      assertEquals(110L, ms.get(fileExpire1).getLastUpdated());
+
+      // metadata is expired so this should refresh the metadata with
+      // last_updated to the getNow()
+      final FileStatus fileExpire2Status = fs.getFileStatus(fileExpire2);
+      assertNotNull(fileExpire2Status);
+      assertEquals(110L, ms.get(fileExpire2).getLastUpdated());
+
+      final FileStatus fileRetainStatus = fs.getFileStatus(fileRetain);
+      assertEquals("Modification time of these files should be equal.",
+          origFileRetainStatus.getModificationTime(),
+          fileRetainStatus.getModificationTime());
+      assertNotNull(fileRetainStatus);
+      assertEquals(109L, ms.get(fileRetain).getLastUpdated());
+    } finally {
+      fs.delete(fileExpire1, true);
+      fs.delete(fileExpire2, true);
+      fs.delete(fileRetain, true);
+      fs.setTtlTimeProvider(originalTimeProvider);
+    }
+  }
+
+  /**
+   * create(tombstone file) must succeed irrespective of overwrite flag.
+   */
+  @Test
+  public void testCreateOnTombstonedFileSucceeds() throws Exception {
+    LOG.info("Authoritative mode: {}", authoritative);
+    final S3AFileSystem fs = getFileSystem();
+
+    String fileToTry = methodName + UUID.randomUUID().toString();
+
+    final Path filePath = path(fileToTry);
+
+    // Create a directory with
+    ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+    ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider();
+
+    try {
+      fs.setTtlTimeProvider(mockTimeProvider);
+      when(mockTimeProvider.getNow()).thenReturn(100L);
+      when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
+
+      // CREATE A FILE
+      touch(fs, filePath);
+
+      // DELETE THE FILE - TOMBSTONE
+      fs.delete(filePath, true);
+
+      // CREATE THE SAME FILE WITHOUT ERROR DESPITE THE TOMBSTONE
+      touch(fs, filePath);
+
+    } finally {
+      fs.delete(filePath, true);
+      fs.setTtlTimeProvider(originalTimeProvider);
+    }
+  }
+
+  /**
+   * create("parent has tombstone") must always succeed (We dont check the
+   * parent), but after the file has been written, all entries up the tree
+   * must be valid. That is: the putAncestor code will correct everything
+   */
+  @Test
+  public void testCreateParentHasTombstone() throws Exception {
+    LOG.info("Authoritative mode: {}", authoritative);
+    final S3AFileSystem fs = getFileSystem();
+
+    String dirToDelete = methodName + UUID.randomUUID().toString();
+    String fileToTry = dirToDelete + "/theFileToTry";
+
+    final Path dirPath = path(dirToDelete);
+    final Path filePath = path(fileToTry);
+
+    // Create a directory with
+    ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+    ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider();
+
+    try {
+      fs.setTtlTimeProvider(mockTimeProvider);
+      when(mockTimeProvider.getNow()).thenReturn(100L);
+      when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
+
+      // CREATE DIRECTORY
+      fs.mkdirs(dirPath);
+
+      // DELETE DIRECTORY
+      fs.delete(dirPath, true);
+
+      // WRITE TO DELETED DIRECTORY - SUCCESS
+      touch(fs, filePath);
+
+      // SET TIME SO METADATA EXPIRES
+      when(mockTimeProvider.getNow()).thenReturn(110L);
+
+      // WRITE TO DELETED DIRECTORY - SUCCESS
+      touch(fs, filePath);
+
+    } finally {
+      fs.delete(filePath, true);
+      fs.delete(dirPath, true);
+      fs.setTtlTimeProvider(originalTimeProvider);
+    }
+  }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
index 9241686..f616190 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
@@ -280,7 +280,7 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
           "This child should have been kept (prefix restriction).", 1);
     } finally {
       getFileSystem().delete(parent, true);
-      ms.prune(Long.MAX_VALUE);
+      ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, Long.MAX_VALUE);
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java
index 149d1f3..5241dd4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java
@@ -230,6 +230,12 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
     IOUtils.cleanupWithLogger(LOG, fileSystem);
   }
 
+  @Override protected String getPathStringForPrune(String path)
+      throws Exception {
+    String b = getTestBucketName(getContract().getFileSystem().getConf());
+    return "/" + b + "/dir2";
+  }
+
   /**
    * Each contract has its own S3AFileSystem and DynamoDBMetadataStore objects.
    */
@@ -437,7 +443,7 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
     }
 
     // move the old paths to new paths and verify
-    ms.move(pathsToDelete, newMetas);
+    ms.move(pathsToDelete, newMetas, getTtlTimeProvider());
     assertEquals(0, ms.listChildren(oldDir).withoutTombstones().numEntries());
     if (newMetas != null) {
       assertTrue(CollectionUtils
@@ -650,7 +656,7 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
             1024, false))
     );
 
-    ddbms.move(fullSourcePaths, pathsToCreate);
+    ddbms.move(fullSourcePaths, pathsToCreate, getTtlTimeProvider());
 
     // assert that all the ancestors should have been populated automatically
     assertCached(testRoot + "/c");
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
index 301ba16..95c607a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
@@ -243,7 +243,8 @@ public class ITestDynamoDBMetadataStoreScale
 
               if (pruneItems == BATCH_SIZE) {
                 describe("pruning files");
-                ddbms.prune(Long.MAX_VALUE /* all files */);
+                ddbms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME,
+                    Long.MAX_VALUE /* all files */);
                 pruneItems = 0;
               }
               if (tracker.probe()) {
@@ -305,7 +306,7 @@ public class ITestDynamoDBMetadataStoreScale
   private void retryingDelete(final Path path) {
     try {
       ddbms.getInvoker().retry("Delete ", path.toString(), true,
-          () -> ddbms.delete(path));
+          () -> ddbms.delete(path, new S3Guard.TtlTimeProvider(getConf())));
     } catch (IOException e) {
       LOG.warn("Failed to delete {}: ", path, e);
     }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
index 55f4707..754da0d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
@@ -68,6 +69,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
   static final FsPermission PERMISSION = null;
   static final String GROUP = null;
   private final long accessTime = 0;
+  private static ITtlTimeProvider ttlTimeProvider;
 
   /**
    * Each test should override this.  Will use a new Configuration instance.
@@ -123,6 +125,8 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     assertNotNull("null MetadataStore", ms);
     assertNotNull("null FileSystem", contract.getFileSystem());
     ms.initialize(contract.getFileSystem());
+    ttlTimeProvider =
+        new S3Guard.TtlTimeProvider(contract.getFileSystem().getConf());
   }
 
   @After
@@ -310,7 +314,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
   public void testDelete() throws Exception {
     setUpDeleteTest();
 
-    ms.delete(strToPath("/ADirectory1/db1/file2"));
+    ms.delete(strToPath("/ADirectory1/db1/file2"), ttlTimeProvider);
 
     /* Ensure delete happened. */
     assertDirectorySize("/ADirectory1/db1", 1);
@@ -338,7 +342,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     if (!allowMissing()) {
       assertCached(p + "/ADirectory1/db1");
     }
-    ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"));
+    ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"), ttlTimeProvider);
 
     assertEmptyDirectory(p + "/ADirectory1");
     assertDeleted(p + "/ADirectory1/db1");
@@ -358,7 +362,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
   public void testDeleteRecursiveRoot() throws Exception {
     setUpDeleteTest();
 
-    ms.deleteSubtree(strToPath("/"));
+    ms.deleteSubtree(strToPath("/"), ttlTimeProvider);
     assertDeleted("/ADirectory1");
     assertDeleted("/ADirectory2");
     assertDeleted("/ADirectory2/db1");
@@ -369,10 +373,10 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
   @Test
   public void testDeleteNonExisting() throws Exception {
     // Path doesn't exist, but should silently succeed
-    ms.delete(strToPath("/bobs/your/uncle"));
+    ms.delete(strToPath("/bobs/your/uncle"), ttlTimeProvider);
 
     // Ditto.
-    ms.deleteSubtree(strToPath("/internets"));
+    ms.deleteSubtree(strToPath("/internets"), ttlTimeProvider);
   }
 
 
@@ -408,7 +412,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     }
 
     if (!(ms instanceof NullMetadataStore)) {
-      ms.delete(strToPath(filePath));
+      ms.delete(strToPath(filePath), ttlTimeProvider);
       meta = ms.get(strToPath(filePath));
       assertTrue("Tombstone not left for deleted file", meta.isDeleted());
     }
@@ -586,7 +590,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     destMetas.add(new PathMetadata(makeDirStatus("/b1")));
     destMetas.add(new PathMetadata(makeFileStatus("/b1/file1", 100)));
     destMetas.add(new PathMetadata(makeFileStatus("/b1/file2", 100)));
-    ms.move(srcPaths, destMetas);
+    ms.move(srcPaths, destMetas, ttlTimeProvider);
 
     // Assert src is no longer there
     dirMeta = ms.listChildren(strToPath("/a1"));
@@ -636,11 +640,11 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
 
     // Make sure delete is correct as well
     if (!allowMissing()) {
-      ms.delete(new Path(p2));
+      ms.delete(new Path(p2), ttlTimeProvider);
       meta = ms.get(new Path(p1));
       assertNotNull("Path should not have been deleted", meta);
     }
-    ms.delete(new Path(p1));
+    ms.delete(new Path(p1), ttlTimeProvider);
   }
 
   @Test
@@ -668,7 +672,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
       assertListingsEqual(ls.getListing(), "/pruneFiles/new",
           "/pruneFiles/old");
     }
-    ms.prune(cutoff);
+    ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, cutoff);
     ls = ms.listChildren(strToPath("/pruneFiles"));
     if (allowMissing()) {
       assertDeleted("/pruneFiles/old");
@@ -698,7 +702,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     Thread.sleep(1);
     long cutoff = getTime();
 
-    ms.prune(cutoff);
+    ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, cutoff);
 
     assertDeleted("/pruneDirs/dir/file");
   }
@@ -728,7 +732,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
       ms.put(parentDirMd);
     }
 
-    ms.prune(time);
+    ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time);
     DirListingMetadata listing;
     for (String directory : directories) {
       Path path = strToPath(directory);
@@ -765,7 +769,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
       ms.put(parentDirMd);
 
       // prune the ms
-      ms.prune(time);
+      ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time);
 
       // get the directory listings
       DirListingMetadata rootDirMd = ms.listChildren(strToPath(rootDir));
@@ -823,6 +827,89 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     }
   }
 
+  @Test
+  public void testPruneExpiredTombstones() throws Exception {
+    List<String> keepFilenames = new ArrayList<>(
+        Arrays.asList("/dir1/fileK1", "/dir1/fileK2", "/dir1/fileK3"));
+    List<String> removeFilenames = new ArrayList<>(
+        Arrays.asList("/dir1/fileR1", "/dir1/fileR2", "/dir1/fileR3"));
+
+    long cutoff = 9001;
+
+    for(String fN : keepFilenames) {
+      final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
+      pathMetadata.setLastUpdated(9002L);
+      ms.put(pathMetadata);
+    }
+
+    for(String fN : removeFilenames) {
+      final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
+      pathMetadata.setLastUpdated(9000L);
+      // tombstones are the deleted files!
+      pathMetadata.setIsDeleted(true);
+      ms.put(pathMetadata);
+    }
+
+    ms.prune(MetadataStore.PruneMode.TOMBSTONES_BY_LASTUPDATED, cutoff);
+
+    if (!allowMissing()) {
+      for (String fN : keepFilenames) {
+        final PathMetadata pathMetadata = ms.get(strToPath(fN));
+        assertNotNull("Kept files should be in the metastore after prune",
+            pathMetadata);
+      }
+    }
+
+    for(String fN : removeFilenames) {
+      final PathMetadata pathMetadata = ms.get(strToPath(fN));
+      assertNull("Expired tombstones should be removed from metastore after "
+          + "the prune.", pathMetadata);
+    }
+  }
+
+  @Test
+  public void testPruneExpiredTombstonesSpecifiedPath() throws Exception {
+    List<String> keepFilenames = new ArrayList<>(
+        Arrays.asList("/dir1/fileK1", "/dir1/fileK2", "/dir1/fileK3"));
+    List<String> removeFilenames = new ArrayList<>(
+        Arrays.asList("/dir2/fileR1", "/dir2/fileR2", "/dir2/fileR3"));
+
+    long cutoff = 9001;
+
+    // Both are expired. Difference is it will only delete the specified one.
+    for (String fN : keepFilenames) {
+      final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
+      pathMetadata.setLastUpdated(9002L);
+      ms.put(pathMetadata);
+    }
+
+    for (String fN : removeFilenames) {
+      final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
+      pathMetadata.setLastUpdated(9000L);
+      // tombstones are the deleted files!
+      pathMetadata.setIsDeleted(true);
+      ms.put(pathMetadata);
+    }
+
+    final String prunePath = getPathStringForPrune("/dir2");
+    ms.prune(MetadataStore.PruneMode.TOMBSTONES_BY_LASTUPDATED, cutoff,
+        prunePath);
+
+    if (!allowMissing()) {
+      for (String fN : keepFilenames) {
+        final PathMetadata pathMetadata = ms.get(strToPath(fN));
+        assertNotNull("Kept files should be in the metastore after prune",
+            pathMetadata);
+      }
+    }
+
+    for (String fN : removeFilenames) {
+      final PathMetadata pathMetadata = ms.get(strToPath(fN));
+      assertNull("Expired tombstones should be removed from metastore after "
+          + "the prune.", pathMetadata);
+    }
+  }
+
   /*
    * Helper functions.
    */
@@ -837,6 +924,16 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     return paths;
   }
 
+
+  /**
+   * The prune operation needs the path with the bucket name as a string in
+   * {@link DynamoDBMetadataStore}, but not for {@link LocalMetadataStore}.
+   * This is an implementation detail of the ms, so this should be
+   * implemented in the subclasses.
+   */
+  protected abstract String getPathStringForPrune(String path)
+      throws Exception;
+
   private void commonTestPutListStatus(final String parent) throws IOException {
     putListStatusFiles(parent, true, buildPathStrings(parent, "file1", "file2",
         "file3"));
@@ -1012,4 +1109,8 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     return System.currentTimeMillis();
   }
 
+  protected static ITtlTimeProvider getTtlTimeProvider() {
+    return ttlTimeProvider;
+  }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
index d0156f1..ee7b584 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
@@ -75,6 +75,11 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
     return new LocalMSContract(conf);
   }
 
+  @Override protected String getPathStringForPrune(String path)
+      throws Exception{
+    return path;
+  }
+
   @Test
   public void testClearByAncestor() throws Exception {
     Cache<Path, LocalMetadataEntry> cache = CacheBuilder.newBuilder().build();
@@ -184,7 +189,7 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
       String prefixStr, String pathStr, int leftoverSize) throws IOException {
     populateMap(cache, prefixStr);
     LocalMetadataStore.deleteEntryByAncestor(new Path(prefixStr + pathStr),
-        cache, true);
+        cache, true, getTtlTimeProvider());
     assertEquals(String.format("Cache should have %d entries", leftoverSize),
         leftoverSize, sizeOfMap(cache));
     cache.invalidateAll();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java
index c0541ea..2e0bc4b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java
@@ -46,6 +46,11 @@ public class TestNullMetadataStore extends MetadataStoreTestBase {
     return true;
   }
 
+  @Override protected String getPathStringForPrune(String path)
+      throws Exception {
+    return path;
+  }
+
   @Override
   public AbstractMSContract createContract() {
     return new NullMSContract();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
index b246da2..bdb256c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
@@ -18,18 +18,28 @@
 
 package org.apache.hadoop.fs.s3a.s3guard;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.Tristate;
 
-import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link S3Guard} utility class.
@@ -58,8 +68,8 @@ public class TestS3Guard extends Assert {
         makeFileStatus("s3a://bucket/dir/s3-file4", false)
     );
 
-    S3Guard.ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
-        DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
+    ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
+        DEFAULT_METADATASTORE_METADATA_TTL);
     FileStatus[] result = S3Guard.dirListingUnion(ms, dirPath, s3Listing,
         dirMeta, false, timeProvider);
 
@@ -70,6 +80,185 @@ public class TestS3Guard extends Assert {
     assertContainsPath(result, "s3a://bucket/dir/s3-file4");
   }
 
+  @Test
+  public void testPutWithTtlDirListingMeta() throws Exception {
+    // arrange
+    DirListingMetadata dlm = new DirListingMetadata(new Path("/"), null,
+        false);
+    MetadataStore ms = spy(MetadataStore.class);
+    ITtlTimeProvider timeProvider =
+        mock(ITtlTimeProvider.class);
+    when(timeProvider.getNow()).thenReturn(100L);
+
+    // act
+    S3Guard.putWithTtl(ms, dlm, timeProvider);
+
+    // assert
+    assertEquals("last update in " + dlm, 100L, dlm.getLastUpdated());
+    verify(timeProvider, times(1)).getNow();
+    verify(ms, times(1)).put(dlm);
+  }
+
+  @Test
+  public void testPutWithTtlFileMeta() throws Exception {
+    // arrange
+    S3AFileStatus fileStatus = mock(S3AFileStatus.class);
+    when(fileStatus.getPath()).thenReturn(new Path("/"));
+    PathMetadata pm = new PathMetadata(fileStatus);
+    MetadataStore ms = spy(MetadataStore.class);
+    ITtlTimeProvider timeProvider =
+        mock(ITtlTimeProvider.class);
+    when(timeProvider.getNow()).thenReturn(100L);
+
+    // act
+    S3Guard.putWithTtl(ms, pm, timeProvider);
+
+    // assert
+    assertEquals("last update in " + pm, 100L, pm.getLastUpdated());
+    verify(timeProvider, times(1)).getNow();
+    verify(ms, times(1)).put(pm);
+  }
+
+  @Test
+  public void testPutWithTtlCollection() throws Exception {
+    // arrange
+    S3AFileStatus fileStatus = mock(S3AFileStatus.class);
+    when(fileStatus.getPath()).thenReturn(new Path("/"));
+    Collection<PathMetadata> pmCollection = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      pmCollection.add(new PathMetadata(fileStatus));
+    }
+    MetadataStore ms = spy(MetadataStore.class);
+    ITtlTimeProvider timeProvider =
+        mock(ITtlTimeProvider.class);
+    when(timeProvider.getNow()).thenReturn(100L);
+
+    // act
+    S3Guard.putWithTtl(ms, pmCollection, timeProvider);
+
+    // assert
+    pmCollection.forEach(
+        pm -> assertEquals(100L, pm.getLastUpdated())
+    );
+    verify(timeProvider, times(1)).getNow();
+    verify(ms, times(1)).put(pmCollection);
+  }
+
+  @Test
+  public void testGetWithTtlExpired() throws Exception {
+    // arrange
+    S3AFileStatus fileStatus = mock(S3AFileStatus.class);
+    Path path = new Path("/file");
+    when(fileStatus.getPath()).thenReturn(path);
+    PathMetadata pm = new PathMetadata(fileStatus);
+    pm.setLastUpdated(100L);
+
+    MetadataStore ms = mock(MetadataStore.class);
+    when(ms.get(path)).thenReturn(pm);
+
+    ITtlTimeProvider timeProvider =
+        mock(ITtlTimeProvider.class);
+    when(timeProvider.getNow()).thenReturn(101L);
+    when(timeProvider.getMetadataTtl()).thenReturn(1L);
+
+    // act
+    final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider);
+
+    // assert
+    assertNull(pmExpired);
+  }
+
+  @Test
+  public void testGetWithTtlNotExpired() throws Exception {
+    // arrange
+    S3AFileStatus fileStatus = mock(S3AFileStatus.class);
+    Path path = new Path("/file");
+    when(fileStatus.getPath()).thenReturn(path);
+    PathMetadata pm = new PathMetadata(fileStatus);
+    pm.setLastUpdated(100L);
+
+    MetadataStore ms = mock(MetadataStore.class);
+    when(ms.get(path)).thenReturn(pm);
+
+    ITtlTimeProvider timeProvider =
+        mock(ITtlTimeProvider.class);
+    when(timeProvider.getNow()).thenReturn(101L);
+    when(timeProvider.getMetadataTtl()).thenReturn(2L);
+
+    // act
+    final PathMetadata pmNotExpired =
+        S3Guard.getWithTtl(ms, path, timeProvider);
+
+    // assert
+    assertNotNull(pmNotExpired);
+  }
+
+  @Test
+  public void testGetWithZeroLastUpdatedNotExpired() throws Exception {
+    // arrange
+    S3AFileStatus fileStatus = mock(S3AFileStatus.class);
+    Path path = new Path("/file");
+    when(fileStatus.getPath()).thenReturn(path);
+    PathMetadata pm = new PathMetadata(fileStatus);
+    // we set 0 this time as the last updated: can happen eg. when we use an
+    // old dynamo table
+    pm.setLastUpdated(0L);
+
+    MetadataStore ms = mock(MetadataStore.class);
+    when(ms.get(path)).thenReturn(pm);
+
+    ITtlTimeProvider timeProvider =
+        mock(ITtlTimeProvider.class);
+    when(timeProvider.getNow()).thenReturn(101L);
+    when(timeProvider.getMetadataTtl()).thenReturn(2L);
+
+    // act
+    final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider);
+
+    // assert
+    assertNotNull(pmExpired);
+  }
+
+
+  /**
+   * Makes sure that all uses of TTL timeouts use a consistent time unit.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testTTLConstruction() throws Throwable {
+    // first one
+    ITtlTimeProvider timeProviderExplicit = new S3Guard.TtlTimeProvider(
+        DEFAULT_METADATASTORE_METADATA_TTL);
+
+    // mirror the FS construction,
+    // from a config guaranteed to be empty (i.e. the code defval)
+    Configuration conf = new Configuration(false);
+    long millitime = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
+        DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
+    assertEquals(15 * 60_000, millitime);
+    S3Guard.TtlTimeProvider fsConstruction = new S3Guard.TtlTimeProvider(
+        millitime);
+    assertEquals("explicit vs fs construction", timeProviderExplicit,
+        fsConstruction);
+    assertEquals("first and second constructor", timeProviderExplicit,
+        new S3Guard.TtlTimeProvider(conf));
+    // set the conf to a time without unit
+    conf.setLong(METADATASTORE_METADATA_TTL,
+        DEFAULT_METADATASTORE_METADATA_TTL);
+    assertEquals("first and second time set through long", timeProviderExplicit,
+        new S3Guard.TtlTimeProvider(conf));
+    double timeInSeconds = DEFAULT_METADATASTORE_METADATA_TTL / 1000;
+    double timeInMinutes = timeInSeconds / 60;
+    String timeStr = String.format("%dm", (int) timeInMinutes);
+    assertEquals(":wrong time in minutes from " + timeInMinutes,
+        "15m", timeStr);
+    conf.set(METADATASTORE_METADATA_TTL, timeStr);
+    assertEquals("Time in millis as string from "
+            + conf.get(METADATASTORE_METADATA_TTL),
+        timeProviderExplicit,
+        new S3Guard.TtlTimeProvider(conf));
+  }
+
   void assertContainsPath(FileStatus[] statuses, String pathStr) {
     assertTrue("listing doesn't contain " + pathStr,
         containsPath(statuses, pathStr));
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
index b843392..1bffc3b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.fs.s3a.scale;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
 
+import org.junit.Before;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runners.MethodSorters;
@@ -54,6 +58,12 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
   static final long ACCESS_TIME = System.currentTimeMillis();
 
   static final Path BUCKET_ROOT = new Path("s3a://fake-bucket/");
+  private ITtlTimeProvider ttlTimeProvider;
+
+  @Before
+  public void initialize() {
+    ttlTimeProvider = new S3Guard.TtlTimeProvider(new Configuration());
+  }
 
   /**
    * Subclasses should override this to provide the MetadataStore they which
@@ -129,7 +139,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
             toDelete = movedPaths;
             toCreate = origMetas;
           }
-          ms.move(toDelete, toCreate);
+          ms.move(toDelete, toCreate, ttlTimeProvider);
         }
         moveTimer.end();
         printTiming(LOG, "move", moveTimer, operations);
@@ -194,7 +204,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
       throws IOException {
     describe("Recursive deletion");
     NanoTimer deleteTimer = new NanoTimer();
-    ms.deleteSubtree(BUCKET_ROOT);
+    ms.deleteSubtree(BUCKET_ROOT, ttlTimeProvider);
     deleteTimer.end();
     printTiming(LOG, "delete", deleteTimer, count);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org