You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2021/08/26 18:01:50 UTC

[cassandra] branch trunk updated: Add TTL support to nodetool snapshots

This is an automated email from the ASF dual-hosted git repository.

paulo pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ad24942  Add TTL support to nodetool snapshots
ad24942 is described below

commit ad249424814836bd00f47931258ad58bfefb24fd
Author: Abi Palagashvili <05...@gmail.com>
AuthorDate: Tue Jun 8 00:23:39 2021 +0300

    Add TTL support to nodetool snapshots
    
    Patch by Abi Palagashvili; Reviewed by Paulo Motta and Stefan Miklosovic for CASSANDRA-16789
    
    Co-authored-by: Paulo Motta <pa...@gmail.com>
    Co-authored-by: Stefan Miklosovic <sm...@apache.org>
    
    Closes #1046
---
 CHANGES.txt                                        |   1 +
 build.xml                                          |   2 +
 .../config/CassandraRelevantProperties.java        |  10 +
 src/java/org/apache/cassandra/config/Duration.java | 276 +++++++++++++++++++++
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 105 +++++---
 src/java/org/apache/cassandra/db/Directories.java  | 188 ++++++++------
 src/java/org/apache/cassandra/db/Keyspace.java     |  17 +-
 .../cassandra/db/SnapshotDetailsTabularData.java   |  31 ++-
 .../org/apache/cassandra/db/SystemKeyspace.java    |   2 +-
 .../apache/cassandra/service/StorageService.java   |  57 +++--
 .../cassandra/service/StorageServiceMBean.java     |  11 +-
 .../service/snapshot/SnapshotManager.java          | 161 ++++++++++++
 .../service/snapshot/SnapshotManifest.java         | 109 ++++++++
 .../cassandra/service/snapshot/TableSnapshot.java  | 166 +++++++++++++
 src/java/org/apache/cassandra/tools/NodeProbe.java |  10 +-
 .../cassandra/tools/nodetool/ListSnapshots.java    |  12 +-
 .../apache/cassandra/tools/nodetool/Snapshot.java  |   8 +
 .../cassandra/distributed/impl/Instance.java       |   4 +-
 .../distributed/test/PreviewRepairTest.java        |   4 +-
 .../distributed/test/RepairDigestTrackingTest.java |   5 +-
 .../distributed/test/SnapshotsTTLTest.java         | 189 ++++++++++++++
 .../org/apache/cassandra/config/DurationTest.java  |  60 +++++
 .../apache/cassandra/db/ColumnFamilyStoreTest.java |  83 +++++--
 .../org/apache/cassandra/db/DirectoriesTest.java   | 214 +++++++++++++---
 .../unit/org/apache/cassandra/db/KeyspaceTest.java |  14 ++
 .../apache/cassandra/db/SystemKeyspaceTest.java    |   2 +-
 .../service/snapshot/SnapshotManagerTest.java      | 181 ++++++++++++++
 .../service/snapshot/SnapshotManifestTest.java     | 117 +++++++++
 .../service/snapshot/TableSnapshotTest.java        | 242 ++++++++++++++++++
 29 files changed, 2068 insertions(+), 213 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index be3ea40..9ce0c20 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Add TTL support to nodetool snapshots (CASSANDRA-16789)
  * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842)
  * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
  * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
diff --git a/build.xml b/build.xml
index a7ab23b..6d028a5 100644
--- a/build.xml
+++ b/build.xml
@@ -519,6 +519,7 @@
           <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-core" version="2.9.10"/>
           <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-databind" version="2.9.10.8"/>
           <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-annotations" version="2.9.10"/>
+          <dependency groupId="com.fasterxml.jackson.datatype" artifactId="jackson-datatype-jsr310" version="2.9.10"/>
           <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/>
           <dependency groupId="com.boundary" artifactId="high-scale-lib" version="1.0.6"/>
           <dependency groupId="com.github.jbellis" artifactId="jamm" version="${jamm.version}"/>
@@ -759,6 +760,7 @@
         <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-core"/>
         <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-databind"/>
         <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-annotations"/>
+        <dependency groupId="com.fasterxml.jackson.datatype" artifactId="jackson-datatype-jsr310"/>
         <dependency groupId="com.googlecode.json-simple" artifactId="json-simple"/>
         <dependency groupId="com.boundary" artifactId="high-scale-lib"/>
         <dependency groupId="org.yaml" artifactId="snakeyaml"/>
diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 7fbbdbb..dc1671d 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -188,9 +188,19 @@ public enum CassandraRelevantProperties
     /** This property indicates whether disable_mbean_registration is true */
     IS_DISABLED_MBEAN_REGISTRATION("org.apache.cassandra.disable_mbean_registration"),
 
+    /** snapshots ttl cleanup period in seconds */
+    SNAPSHOT_CLEANUP_PERIOD_SECONDS("cassandra.snapshot.ttl_cleanup_period_seconds", "60"),
+
+    /** snapshots ttl cleanup initial delay in seconds */
+    SNAPSHOT_CLEANUP_INITIAL_DELAY_SECONDS("cassandra.snapshot.ttl_cleanup_initial_delay_seconds", "5"),
+
+    /** minimum allowed TTL for snapshots */
+    SNAPSHOT_MIN_ALLOWED_TTL_SECONDS("cassandra.snapshot.min_allowed_ttl_seconds", "60"),
+
     /** what class to use for mbean registeration */
     MBEAN_REGISTRATION_CLASS("org.apache.cassandra.mbean_registration_class");
 
+
     CassandraRelevantProperties(String key, String defaultVal)
     {
         this.key = key;
diff --git a/src/java/org/apache/cassandra/config/Duration.java b/src/java/org/apache/cassandra/config/Duration.java
new file mode 100644
index 0000000..89de354
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/Duration.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Ints;
+
+/**
+ * Represents a positive time duration.
+ */
+public final class Duration
+{
+    /**
+     * The Regexp used to parse the duration provided as String.
+     */
+    private static final Pattern TIME_UNITS_PATTERN = Pattern.compile(("^(\\d+)([a-zA-Z]{1,2}|µs|µS)$"));
+    private static final Pattern DOUBLE_TIME_UNITS_PATTERN = Pattern.compile(("^(\\d+\\.\\d+)([a-zA-Z]{1,2}|µs|µS)$"));
+    
+    private final long quantity;
+
+    private final TimeUnit unit;
+
+
+    public Duration(String value)
+    {
+        if (value == null || value.equals("null"))
+        {
+            quantity = 0;
+            unit = TimeUnit.MILLISECONDS;
+            return;
+        }
+
+        //parse the string field value
+        Matcher matcher = TIME_UNITS_PATTERN.matcher(value);
+        Matcher matcherDouble = DOUBLE_TIME_UNITS_PATTERN.matcher(value);
+
+        if(matcher.find())
+        {
+            quantity = Long.parseLong(matcher.group(1));
+            unit = fromSymbol(matcher.group(2));
+        }
+        else if(matcherDouble.find())
+        {
+            quantity =(long) Double.parseDouble(matcherDouble.group(1));
+            unit = fromSymbol(matcherDouble.group(2));
+        }
+        else {
+            throw new IllegalArgumentException("Invalid duration: " + value);
+        }
+    }
+
+    private Duration(long quantity, TimeUnit unit)
+    {
+        if (quantity < 0)
+            throw new IllegalArgumentException("Duration must be positive");
+
+        this.quantity = quantity;
+        this.unit = unit;
+    }
+
+    private Duration(double quantity, TimeUnit unit)
+    {
+        if (quantity < 0)
+            throw new IllegalArgumentException("Duration must be positive");
+
+        this.quantity = (long) quantity;
+        this.unit = unit;
+    }
+
+    /**
+     * Creates a {@code Duration} of the specified amount of milliseconds.
+     *
+     * @param milliseconds the amount of milliseconds
+     * @return a duration
+     */
+    public static Duration inMilliseconds(long milliseconds)
+    {
+        return new Duration(milliseconds, TimeUnit.MILLISECONDS);
+    }
+
+    public static Duration inDoubleMilliseconds(double milliseconds)
+    {
+        return new Duration(milliseconds, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a {@code Duration} of the specified amount of seconds.
+     *
+     * @param seconds the amount of seconds
+     * @return a duration
+     */
+    public static Duration inSeconds(long seconds)
+    {
+        return new Duration(seconds, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Creates a {@code Duration} of the specified amount of minutes.
+     *
+     * @param minutes the amount of minutes
+     * @return a duration
+     */
+    public static Duration inMinutes(long minutes)
+    {
+        return new Duration(minutes, TimeUnit.MINUTES);
+    }
+
+    /**
+     * Returns the time unit associated to the specified symbol
+     *
+     * @param symbol the time unit symbol
+     * @return the time unit associated to the specified symbol
+     */
+    private TimeUnit fromSymbol(String symbol)
+    {
+        switch (symbol.toLowerCase())
+        {
+            case "d": return TimeUnit.DAYS;
+            case "h": return TimeUnit.HOURS;
+            case "m": return TimeUnit.MINUTES;
+            case "s": return TimeUnit.SECONDS;
+            case "ms": return TimeUnit.MILLISECONDS;
+            case "us":
+            case "µs": return TimeUnit.MICROSECONDS;
+            case "ns": return TimeUnit.NANOSECONDS;
+        }
+        throw new IllegalArgumentException(String.format("Unsupported time unit: %s. Supported units are: %s",
+                                                         symbol, Arrays.stream(TimeUnit.values())
+                                                                       .map(Duration::getSymbol)
+                                                                       .collect(Collectors.joining(", "))));
+    }
+
+    /**
+     * Returns this duration in the specified time unit
+     *
+     * @param targetUnit the time unit
+     * @return this duration in the specified time unit
+     */
+    public long to(TimeUnit targetUnit)
+    {
+        return targetUnit.convert(quantity, unit);
+    }
+
+    /**
+     * Returns this duration in number of minutes
+     *
+     * @return this duration in number of minutes
+     */
+    public long toMinutes()
+    {
+        return unit.toMinutes(quantity);
+    }
+
+    /**
+     * Returns this duration in number of minutes as an {@code int}
+     *
+     * @return this duration in number of minutes or {@code Integer.MAX_VALUE} if the number of minutes is too large.
+     */
+    public int toMinutesAsInt()
+    {
+        return Ints.saturatedCast(toMinutes());
+    }
+
+    /**
+     * Returns this duration in number of seconds
+     *
+     * @return this duration in number of seconds
+     */
+    public long toSeconds()
+    {
+        return unit.toSeconds(quantity);
+    }
+
+    /**
+     * Returns this duration in number of seconds as an {@code int}
+     *
+     * @return this duration in number of seconds or {@code Integer.MAX_VALUE} if the number of seconds is too large.
+     */
+    public int toSecondsAsInt()
+    {
+        return Ints.saturatedCast(toSeconds());
+    }
+
+    /**
+     * Returns this duration in number of milliseconds
+     *
+     * @return this duration in number of milliseconds
+     */
+    public long toMilliseconds()
+    {
+        return unit.toMillis(quantity);
+    }
+
+    /**
+     * Returns this duration in number of milliseconds as an {@code int}
+     *
+     * @return this duration in number of milliseconds or {@code Integer.MAX_VALUE} if the number of milliseconds is too large.
+     */
+    public int toMillisecondsAsInt()
+    {
+        return Ints.saturatedCast(toMilliseconds());
+    }
+
+    @Override
+    public int hashCode()
+    {
+        // Milliseconds seems to be a reasonable tradeoff
+        return Objects.hash(unit.toMillis(quantity));
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj)
+            return true;
+
+        if (!(obj instanceof Duration))
+            return false;
+
+        Duration other = (Duration) obj;
+        if (unit == other.unit)
+            return quantity == other.quantity;
+
+        // Due to overflows we can only guarantee that the 2 durations are equal if we get the same results
+        // doing the convertion in both directions.
+        return unit.convert(other.quantity, other.unit) == quantity && other.unit.convert(quantity, unit) == other.quantity;
+    }
+
+    @Override
+    public String toString()
+    {
+        return quantity + getSymbol(unit);
+    }
+
+    /**
+     * Returns the symbol associated to the specified unit
+     *
+     * @param unit the time unit
+     * @return the time unit symbol
+     */
+    private static String getSymbol(TimeUnit unit)
+    {
+        switch (unit)
+        {
+            case DAYS: return "d";
+            case HOURS: return "h";
+            case MINUTES: return "m";
+            case SECONDS: return "s";
+            case MILLISECONDS: return "ms";
+            case MICROSECONDS: return "us";
+            case NANOSECONDS: return "ns";
+        }
+        throw new AssertionError();
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index df9f763..c9f9964 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import javax.management.*;
 import javax.management.openmbean.*;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.*;
@@ -41,6 +42,7 @@ import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.Duration;
 import org.apache.cassandra.cache.*;
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;
@@ -83,13 +85,13 @@ import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.snapshot.SnapshotManifest;
+import org.apache.cassandra.service.snapshot.TableSnapshot;
 import org.apache.cassandra.streaming.TableStreamManager;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
@@ -1824,21 +1826,25 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return metadata().comparator;
     }
 
-    public void snapshotWithoutFlush(String snapshotName)
+    public TableSnapshot snapshotWithoutFlush(String snapshotName)
     {
-        snapshotWithoutFlush(snapshotName, null, false, null);
+        return snapshotWithoutFlush(snapshotName, null, false, null, null);
     }
 
     /**
      * @param ephemeral If this flag is set to true, the snapshot will be cleaned during next startup
      */
-    public Set<SSTableReader> snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral, RateLimiter rateLimiter)
+    public TableSnapshot snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral, Duration ttl, RateLimiter rateLimiter)
     {
+        if (ephemeral && ttl != null)
+        {
+            throw new IllegalStateException(String.format("can not take ephemeral snapshot (%s) while ttl is specified too", snapshotName));
+        }
+
         if (rateLimiter == null)
             rateLimiter = DatabaseDescriptor.getSnapshotRateLimiter();
 
-        Set<SSTableReader> snapshottedSSTables = new HashSet<>();
-        final JSONArray filesJSONArr = new JSONArray();
+        Set<SSTableReader> snapshottedSSTables = new LinkedHashSet<>();
         for (ColumnFamilyStore cfs : concatWithIndexes())
         {
             try (RefViewFragment currentView = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (x) -> predicate == null || predicate.apply(x))))
@@ -1848,7 +1854,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     File snapshotDirectory = Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName);
                     rateLimiter.acquire(SSTableReader.componentsFor(ssTable.descriptor).size());
                     ssTable.createLinks(snapshotDirectory.getPath()); // hard links
-                    filesJSONArr.add(ssTable.descriptor.relativeFilenameFor(Component.DATA));
 
                     if (logger.isTraceEnabled())
                         logger.trace("Snapshot for {} keyspace data file {} created in {}", keyspace, ssTable.getFilename(), snapshotDirectory);
@@ -1857,30 +1862,53 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
         }
 
-        writeSnapshotManifest(filesJSONArr, snapshotName);
+        return createSnapshot(snapshotName, ephemeral, ttl, snapshottedSSTables);
+    }
+
+    protected TableSnapshot createSnapshot(String tag, boolean ephemeral, Duration ttl, Set<SSTableReader> sstables) {
+        Set<File> snapshotDirs = sstables.stream()
+                                         .map(s -> Directories.getSnapshotDirectory(s.descriptor, tag).getAbsoluteFile())
+                                         .filter(dir -> !Directories.isSecondaryIndexFolder(dir)) // Remove secondary index subdirectory
+                                         .collect(Collectors.toCollection(HashSet::new));
+
+        // Create and write snapshot manifest
+        SnapshotManifest manifest = new SnapshotManifest(mapToDataFilenames(sstables), ttl);
+        File manifestFile = getDirectories().getSnapshotManifestFile(tag);
+        writeSnapshotManifest(manifest, manifestFile);
+        snapshotDirs.add(manifestFile.getParentFile().getAbsoluteFile()); // manifest may create empty snapshot dir
+
+        // Write snapshot schema
         if (!SchemaConstants.isLocalSystemKeyspace(metadata.keyspace) && !SchemaConstants.isReplicatedSystemKeyspace(metadata.keyspace))
-            writeSnapshotSchema(snapshotName);
+        {
+            File schemaFile = getDirectories().getSnapshotSchemaFile(tag);
+            writeSnapshotSchema(schemaFile);
+            snapshotDirs.add(schemaFile.getParentFile().getAbsoluteFile()); // schema may create empty snapshot dir
+        }
 
+        // Maybe create ephemeral marker
         if (ephemeral)
-            createEphemeralSnapshotMarkerFile(snapshotName);
-        return snapshottedSSTables;
+        {
+            File ephemeralSnapshotMarker = getDirectories().getNewEphemeralSnapshotMarkerFile(tag);
+            createEphemeralSnapshotMarkerFile(tag, ephemeralSnapshotMarker);
+            snapshotDirs.add(ephemeralSnapshotMarker.getParentFile().getAbsoluteFile()); // marker may create empty snapshot dir
+        }
+
+        TableSnapshot snapshot = new TableSnapshot(metadata.keyspace, metadata.name, tag, manifest.createdAt,
+                                                   manifest.expiresAt, snapshotDirs, directories::getTrueAllocatedSizeIn);
+
+        StorageService.instance.addSnapshot(snapshot);
+        return snapshot;
     }
 
-    private void writeSnapshotManifest(final JSONArray filesJSONArr, final String snapshotName)
+    private SnapshotManifest writeSnapshotManifest(SnapshotManifest manifest, File manifestFile)
     {
-        final File manifestFile = getDirectories().getSnapshotManifestFile(snapshotName);
-
         try
         {
             if (!manifestFile.getParentFile().exists())
                 manifestFile.getParentFile().mkdirs();
 
-            try (PrintStream out = new PrintStream(manifestFile))
-            {
-                final JSONObject manifestJSON = new JSONObject();
-                manifestJSON.put("files", filesJSONArr);
-                out.println(manifestJSON.toJSONString());
-            }
+            manifest.serializeToJsonFile(manifestFile);
+            return manifest;
         }
         catch (IOException e)
         {
@@ -1888,10 +1916,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    private void writeSnapshotSchema(final String snapshotName)
+    private List<String> mapToDataFilenames(Collection<SSTableReader> sstables)
     {
-        final File schemaFile = getDirectories().getSnapshotSchemaFile(snapshotName);
+        return sstables.stream().map(s -> s.descriptor.relativeFilenameFor(Component.DATA)).collect(Collectors.toList());
+    }
 
+    private void writeSnapshotSchema(File schemaFile)
+    {
         try
         {
             if (!schemaFile.getParentFile().exists())
@@ -1910,10 +1941,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    private void createEphemeralSnapshotMarkerFile(final String snapshot)
+    private void createEphemeralSnapshotMarkerFile(final String snapshot, File ephemeralSnapshotMarker)
     {
-        final File ephemeralSnapshotMarker = getDirectories().getNewEphemeralSnapshotMarkerFile(snapshot);
-
         try
         {
             if (!ephemeralSnapshotMarker.getParentFile().exists())
@@ -1988,9 +2017,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      *
      * @param snapshotName the name of the associated with the snapshot
      */
-    public Set<SSTableReader> snapshot(String snapshotName)
+    public TableSnapshot snapshot(String snapshotName)
     {
-        return snapshot(snapshotName, false, null);
+        return snapshot(snapshotName, false, null, null);
     }
 
     /**
@@ -2000,9 +2029,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @param skipFlush Skip blocking flush of memtable
      * @param rateLimiter Rate limiter for hardlinks-per-second
      */
-    public Set<SSTableReader> snapshot(String snapshotName, boolean skipFlush, RateLimiter rateLimiter)
+    public TableSnapshot snapshot(String snapshotName, boolean skipFlush, Duration ttl, RateLimiter rateLimiter)
     {
-        return snapshot(snapshotName, null, false, skipFlush, rateLimiter);
+        return snapshot(snapshotName, null, false, skipFlush, ttl, rateLimiter);
     }
 
 
@@ -2010,9 +2039,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @param ephemeral If this flag is set to true, the snapshot will be cleaned up during next startup
      * @param skipFlush Skip blocking flush of memtable
      */
-    public Set<SSTableReader> snapshot(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral, boolean skipFlush)
+    public TableSnapshot snapshot(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral, boolean skipFlush)
     {
-        return snapshot(snapshotName, predicate, ephemeral, skipFlush, null);
+        return snapshot(snapshotName, predicate, ephemeral, skipFlush, null, null);
     }
 
     /**
@@ -2020,13 +2049,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @param skipFlush Skip blocking flush of memtable
      * @param rateLimiter Rate limiter for hardlinks-per-second
      */
-    public Set<SSTableReader> snapshot(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral, boolean skipFlush, RateLimiter rateLimiter)
+    public TableSnapshot snapshot(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral, boolean skipFlush, Duration ttl, RateLimiter rateLimiter)
     {
         if (!skipFlush)
         {
             forceBlockingFlush();
         }
-        return snapshotWithoutFlush(snapshotName, predicate, ephemeral, rateLimiter);
+        return snapshotWithoutFlush(snapshotName, predicate, ephemeral, ttl, rateLimiter);
     }
 
     public boolean snapshotExists(String snapshotName)
@@ -2034,10 +2063,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return getDirectories().snapshotExists(snapshotName);
     }
 
-    public long getSnapshotCreationTime(String snapshotName)
-    {
-        return getDirectories().snapshotCreationTime(snapshotName);
-    }
 
     /**
      * Clear all the snapshots for a given column family.
@@ -2057,9 +2082,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @return  Return a map of all snapshots to space being used
      * The pair for a snapshot has true size and size on disk.
      */
-    public Map<String, Directories.SnapshotSizeDetails> getSnapshotDetails()
+    public Map<String, TableSnapshot> listSnapshots()
     {
-        return getDirectories().getSnapshotDetails();
+        return getDirectories().listSnapshots();
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index cf4238c..311cfcb 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -19,9 +19,13 @@ package org.apache.cassandra.db;
 
 import java.io.*;
 import java.nio.file.*;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -41,6 +45,8 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.snapshot.SnapshotManifest;
+import org.apache.cassandra.service.snapshot.TableSnapshot;
 import org.apache.cassandra.utils.DirectorySizeCalculator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -544,6 +550,11 @@ public class Directories
     public File getSnapshotManifestFile(String snapshotName)
     {
         File snapshotDir = getSnapshotDirectory(getDirectoryForNewSSTables(), snapshotName);
+        return getSnapshotManifestFile(snapshotDir);
+    }
+
+    protected static File getSnapshotManifestFile(File snapshotDir)
+    {
         return new File(snapshotDir, "manifest.json");
     }
 
@@ -937,33 +948,62 @@ public class Directories
         }
     }
 
-    /**
-     *
-     * @return  Return a map of all snapshots to space being used
-     * The pair for a snapshot has size on disk and true size.
-     */
-    public Map<String, SnapshotSizeDetails> getSnapshotDetails()
+    public Map<String, TableSnapshot> listSnapshots()
     {
-        List<File> snapshots = listSnapshots();
-        final Map<String, SnapshotSizeDetails> snapshotSpaceMap = Maps.newHashMapWithExpectedSize(snapshots.size());
-        for (File snapshot : snapshots)
-        {
-            final long sizeOnDisk = FileUtils.folderSize(snapshot);
-            final long trueSize = getTrueAllocatedSizeIn(snapshot);
-            SnapshotSizeDetails spaceUsed = snapshotSpaceMap.get(snapshot.getName());
-            if (spaceUsed == null)
-                spaceUsed =  new SnapshotSizeDetails(sizeOnDisk,trueSize);
-            else
-                spaceUsed = new SnapshotSizeDetails(spaceUsed.sizeOnDiskBytes + sizeOnDisk, spaceUsed.dataSizeBytes + trueSize);
-            snapshotSpaceMap.put(snapshot.getName(), spaceUsed);
+        Map<String, Set<File>> snapshotDirsByTag = listSnapshotDirsByTag();
+
+        Map<String, TableSnapshot> snapshots = Maps.newHashMapWithExpectedSize(snapshotDirsByTag.size());
+
+        for (Map.Entry<String, Set<File>> entry : snapshotDirsByTag.entrySet())
+        {
+            String tag = entry.getKey();
+            Set<File> snapshotDirs = entry.getValue();
+            SnapshotManifest manifest = maybeLoadManifest(metadata.keyspace, metadata.name, tag, snapshotDirs);
+            snapshots.put(tag, buildSnapshot(tag, manifest, snapshotDirs));
+        }
+
+        return snapshots;
+    }
+
+    protected TableSnapshot buildSnapshot(String tag, SnapshotManifest manifest, Set<File> snapshotDirs) {
+        Instant createdAt = manifest == null ? null : manifest.createdAt;
+        Instant expiresAt = manifest == null ? null : manifest.expiresAt;
+        return new TableSnapshot(metadata.keyspace, metadata.name, tag, createdAt, expiresAt, snapshotDirs,
+                                 this::getTrueAllocatedSizeIn);
+    }
+
+    @VisibleForTesting
+    protected static SnapshotManifest maybeLoadManifest(String keyspace, String table, String tag, Set<File> snapshotDirs)
+    {
+        List<File> manifests = snapshotDirs.stream().map(d -> new File(d, "manifest.json"))
+                                           .filter(d -> d.exists()).collect(Collectors.toList());
+
+        if (manifests.isEmpty())
+        {
+            logger.warn("No manifest found for snapshot {} of table {}.{}.", tag, keyspace, table);
+            return null;
+        }
+
+        if (manifests.size() > 1) {
+            logger.warn("Found multiple manifests for snapshot {} of table {}.{}", tag, keyspace, table);
+        }
+
+        try
+        {
+            return SnapshotManifest.deserializeFromJsonFile(manifests.get(0));
+        }
+        catch (IOException e)
+        {
+            logger.warn("Cannot read manifest file {} of snapshot {}.", manifests, tag, e);
         }
-        return snapshotSpaceMap;
+
+        return null;
     }
 
     public List<String> listEphemeralSnapshots()
     {
         final List<String> ephemeralSnapshots = new LinkedList<>();
-        for (File snapshot : listSnapshots())
+        for (File snapshot : listAllSnapshots())
         {
             if (getEphemeralSnapshotMarkerFile(snapshot).exists())
                 ephemeralSnapshots.add(snapshot.getName());
@@ -971,7 +1011,7 @@ public class Directories
         return ephemeralSnapshots;
     }
 
-    private List<File> listSnapshots()
+    private List<File> listAllSnapshots()
     {
         final List<File> snapshots = new LinkedList<>();
         for (final File dir : dataPaths)
@@ -996,6 +1036,32 @@ public class Directories
         return snapshots;
     }
 
+    @VisibleForTesting
+    protected Map<String, Set<File>> listSnapshotDirsByTag()
+    {
+        Map<String, Set<File>> snapshotDirsByTag = new HashMap<>();
+        for (final File dir : dataPaths)
+        {
+            File snapshotDir = isSecondaryIndexFolder(dir)
+                               ? new File(dir.getParent(), SNAPSHOT_SUBDIR)
+                               : new File(dir, SNAPSHOT_SUBDIR);
+            if (snapshotDir.exists() && snapshotDir.isDirectory())
+            {
+                final File[] snapshotDirs  = snapshotDir.listFiles();
+                if (snapshotDirs != null)
+                {
+                    for (final File snapshot : snapshotDirs)
+                    {
+                        if (snapshot.isDirectory()) {
+                            snapshotDirsByTag.computeIfAbsent(snapshot.getName(), k -> new LinkedHashSet<>()).add(snapshot.getAbsoluteFile());
+                        }
+                    }
+                }
+            }
+        }
+        return snapshotDirsByTag;
+    }
+
     public boolean snapshotExists(String snapshotName)
     {
         for (File dir : dataPaths)
@@ -1015,41 +1081,34 @@ public class Directories
         return false;
     }
 
-    public static void clearSnapshot(String snapshotName, List<File> snapshotDirectories, RateLimiter snapshotRateLimiter)
+    public static void clearSnapshot(String snapshotName, List<File> tableDirectories, RateLimiter snapshotRateLimiter)
     {
         // If snapshotName is empty or null, we will delete the entire snapshot directory
         String tag = snapshotName == null ? "" : snapshotName;
-        for (File dir : snapshotDirectories)
+        for (File tableDir : tableDirectories)
         {
-            File snapshotDir = new File(dir, join(SNAPSHOT_SUBDIR, tag));
-            if (snapshotDir.exists())
-            {
-                logger.trace("Removing snapshot directory {}", snapshotDir);
-                try
-                {
-                    FileUtils.deleteRecursiveWithThrottle(snapshotDir, snapshotRateLimiter);
-                }
-                catch (FSWriteError e)
-                {
-                    if (FBUtilities.isWindows)
-                        SnapshotDeletingTask.addFailedSnapshot(snapshotDir);
-                    else
-                        throw e;
-                }
-            }
+            File snapshotDir = new File(tableDir, join(SNAPSHOT_SUBDIR, tag));
+            removeSnapshotDirectory(snapshotRateLimiter, snapshotDir);
         }
     }
 
-    // The snapshot must exist
-    public long snapshotCreationTime(String snapshotName)
+    public static void removeSnapshotDirectory(RateLimiter snapshotRateLimiter, File snapshotDir)
     {
-        for (File dir : dataPaths)
+        if (snapshotDir.exists())
         {
-            File snapshotDir = getSnapshotDirectory(dir, snapshotName);
-            if (snapshotDir.exists())
-                return snapshotDir.lastModified();
+            logger.trace("Removing snapshot directory {}", snapshotDir);
+            try
+            {
+                FileUtils.deleteRecursiveWithThrottle(snapshotDir, snapshotRateLimiter);
+            }
+            catch (FSWriteError e)
+            {
+                if (FBUtilities.isWindows)
+                    SnapshotDeletingTask.addFailedSnapshot(snapshotDir);
+                else
+                    throw e;
+            }
         }
-        throw new RuntimeException("Snapshot " + snapshotName + " doesn't exist");
     }
 
     /**
@@ -1081,19 +1140,19 @@ public class Directories
         return totalAllocatedSize;
     }
 
-    public long getTrueAllocatedSizeIn(File input)
+    public long getTrueAllocatedSizeIn(File snapshotDir)
     {
-        if (!input.isDirectory())
+        if (!snapshotDir.isDirectory())
             return 0;
 
-        SSTableSizeSummer visitor = new SSTableSizeSummer(input, sstableLister(Directories.OnTxnErr.THROW).listFiles());
+        SSTableSizeSummer visitor = new SSTableSizeSummer(snapshotDir, sstableLister(OnTxnErr.THROW).listFiles());
         try
         {
-            Files.walkFileTree(input.toPath(), visitor);
+            Files.walkFileTree(snapshotDir.toPath(), visitor);
         }
         catch (IOException e)
         {
-            logger.error("Could not calculate the size of {}. {}", input, e.getMessage());
+            logger.error("Could not calculate the size of {}. {}", snapshotDir, e.getMessage());
         }
 
         return visitor.getAllocatedSize();
@@ -1175,31 +1234,4 @@ public class Directories
         }
     }
 
-    public static class SnapshotSizeDetails
-    {
-        final long sizeOnDiskBytes;
-        final long dataSizeBytes;
-
-        private SnapshotSizeDetails(long sizeOnDiskBytes, long dataSizeBytes)
-        {
-            this.sizeOnDiskBytes = sizeOnDiskBytes;
-            this.dataSizeBytes = dataSizeBytes;
-        }
-
-        @Override
-        public final int hashCode()
-        {
-            int hashCode = (int) sizeOnDiskBytes ^ (int) (sizeOnDiskBytes >>> 32);
-            return 31 * (hashCode ^ (int) ((int) dataSizeBytes ^ (dataSizeBytes >>> 32)));
-        }
-
-        @Override
-        public final boolean equals(Object o)
-        {
-            if(!(o instanceof SnapshotSizeDetails))
-                return false;
-            SnapshotSizeDetails that = (SnapshotSizeDetails)o;
-            return sizeOnDiskBytes == that.sizeOnDiskBytes && dataSizeBytes == that.dataSizeBytes;
-        }
-    }
 }
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index ead01fb..66eb1e0 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -41,6 +41,7 @@ import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.Duration;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -64,6 +65,7 @@ import org.apache.cassandra.schema.SchemaProvider;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.service.snapshot.TableSnapshot;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -246,7 +248,7 @@ public class Keyspace
      * @param rateLimiter Rate limiter for hardlinks-per-second
      * @throws IOException if the column family doesn't exist
      */
-    public void snapshot(String snapshotName, String columnFamilyName, boolean skipFlush, RateLimiter rateLimiter) throws IOException
+    public void snapshot(String snapshotName, String columnFamilyName, boolean skipFlush, Duration ttl, RateLimiter rateLimiter) throws IOException
     {
         assert snapshotName != null;
         boolean tookSnapShot = false;
@@ -255,7 +257,7 @@ public class Keyspace
             if (columnFamilyName == null || cfStore.name.equals(columnFamilyName))
             {
                 tookSnapShot = true;
-                cfStore.snapshot(snapshotName, skipFlush, rateLimiter);
+                cfStore.snapshot(snapshotName, skipFlush, ttl, rateLimiter);
             }
         }
 
@@ -273,7 +275,7 @@ public class Keyspace
      */
     public void snapshot(String snapshotName, String columnFamilyName) throws IOException
     {
-        snapshot(snapshotName, columnFamilyName, false, null);
+        snapshot(snapshotName, columnFamilyName, false, null, null);
     }
 
     /**
@@ -322,8 +324,8 @@ public class Keyspace
     {
         RateLimiter clearSnapshotRateLimiter = DatabaseDescriptor.getSnapshotRateLimiter();
 
-        List<File> snapshotDirs = Directories.getKSChildDirectories(keyspace);
-        Directories.clearSnapshot(snapshotName, snapshotDirs, clearSnapshotRateLimiter);
+        List<File> tableDirectories = Directories.getKSChildDirectories(keyspace);
+        Directories.clearSnapshot(snapshotName, tableDirectories, clearSnapshotRateLimiter);
     }
 
     /**
@@ -337,6 +339,11 @@ public class Keyspace
         return list;
     }
 
+    public Stream<TableSnapshot> getAllSnapshots()
+    {
+        return getColumnFamilyStores().stream().flatMap(cfs -> cfs.listSnapshots().values().stream());
+    }
+
     private Keyspace(String keyspaceName, SchemaProvider schema, boolean loadSSTables)
     {
         this.schema = schema;
diff --git a/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java b/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java
index 5ef729a..73a4876 100644
--- a/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java
+++ b/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java
@@ -17,15 +17,11 @@
  */
 package org.apache.cassandra.db;
 
-import java.util.Map;
 import javax.management.openmbean.*;
 
 import com.google.common.base.Throwables;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.Pair;
-
-
-
+import org.apache.cassandra.service.snapshot.TableSnapshot;
 
 public class SnapshotDetailsTabularData
 {
@@ -34,13 +30,17 @@ public class SnapshotDetailsTabularData
             "Keyspace name",
             "Column family name",
             "True size",
-            "Size on disk"};
+            "Size on disk",
+            "Creation time",
+            "Expiration time",};
 
     private static final String[] ITEM_DESCS = new String[]{"snapshot_name",
             "keyspace_name",
             "columnfamily_name",
             "TrueDiskSpaceUsed",
-            "TotalDiskSpaceUsed"};
+            "TotalDiskSpaceUsed",
+            "created_at",
+            "expires_at",};
 
     private static final String TYPE_NAME = "SnapshotDetails";
 
@@ -56,7 +56,7 @@ public class SnapshotDetailsTabularData
     {
         try
         {
-            ITEM_TYPES = new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING };
+            ITEM_TYPES = new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING };
 
             COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES);
 
@@ -69,18 +69,25 @@ public class SnapshotDetailsTabularData
     }
 
 
-    public static void from(final String snapshot, final String ks, final String cf, Map.Entry<String, Directories.SnapshotSizeDetails> snapshotDetail, TabularDataSupport result)
+    public static void from(TableSnapshot details, TabularDataSupport result)
     {
         try
         {
-            final String totalSize = FileUtils.stringifyFileSize(snapshotDetail.getValue().sizeOnDiskBytes);
-            final String liveSize =  FileUtils.stringifyFileSize(snapshotDetail.getValue().dataSizeBytes);
+            final String totalSize = FileUtils.stringifyFileSize(details.computeSizeOnDiskBytes());
+            final String liveSize =  FileUtils.stringifyFileSize(details.computeTrueSizeBytes());
+            String createdAt = safeToString(details.getCreatedAt());
+            String expiresAt = safeToString(details.getExpiresAt());
             result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES,
-                    new Object[]{ snapshot, ks, cf, liveSize, totalSize }));
+                    new Object[]{ details.getTag(), details.getKeyspace(), details.getTable(), liveSize, totalSize, createdAt, expiresAt }));
         }
         catch (OpenDataException e)
         {
             throw new RuntimeException(e);
         }
     }
+
+    private static String safeToString(Object object)
+    {
+        return object == null ? null : object.toString();
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 4d2a567..3067142 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1474,7 +1474,7 @@ public final class SystemKeyspace
                                                                              previous,
                                                                              next));
             for (String keyspace : SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES)
-                Keyspace.open(keyspace).snapshot(snapshotName, null, false, null);
+                Keyspace.open(keyspace).snapshot(snapshotName, null, false, null, null);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 66e1713..c226d37 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -47,7 +47,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 
-import org.apache.cassandra.audit.AuditLogOptionsCompositeData;
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
 import org.apache.cassandra.fql.FullQueryLogger;
 import org.apache.cassandra.fql.FullQueryLoggerOptions;
@@ -69,6 +69,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Duration;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -101,6 +102,8 @@ import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.ViewMetadata;
+import org.apache.cassandra.service.snapshot.SnapshotManager;
+import org.apache.cassandra.service.snapshot.TableSnapshot;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.tracing.TraceKeyspace;
 import org.apache.cassandra.transport.ClientResourceLimits;
@@ -187,6 +190,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private final List<Runnable> preShutdownHooks = new ArrayList<>();
     private final List<Runnable> postShutdownHooks = new ArrayList<>();
 
+    private final SnapshotManager snapshotManager = new SnapshotManager();
+
     public static final StorageService instance = new StorageService();
 
     @Deprecated
@@ -981,6 +986,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             LoadBroadcaster.instance.startBroadcasting();
             HintsService.instance.startDispatch();
             BatchlogManager.instance.start();
+            snapshotManager.start();
         }
     }
 
@@ -3697,15 +3703,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     @Override
     public void takeSnapshot(String tag, Map<String, String> options, String... entities) throws IOException
     {
-        boolean skipFlush = Boolean.parseBoolean(options.getOrDefault("skipFlush", "false"));
+        Duration ttl = options.containsKey("ttl") ? new Duration(options.get("ttl")) : null;
+        if (ttl != null)
+        {
+            int minAllowedTtlSecs = CassandraRelevantProperties.SNAPSHOT_MIN_ALLOWED_TTL_SECONDS.getInt();
+            if (ttl.toSeconds() < minAllowedTtlSecs)
+                throw new IllegalArgumentException(String.format("ttl for snapshot must be at least %d seconds", minAllowedTtlSecs));
+        }
 
+        boolean skipFlush = Boolean.parseBoolean(options.getOrDefault("skipFlush", "false"));
         if (entities != null && entities.length > 0 && entities[0].contains("."))
         {
-            takeMultipleTableSnapshot(tag, skipFlush, entities);
+            takeMultipleTableSnapshot(tag, skipFlush, ttl, entities);
         }
         else
         {
-            takeSnapshot(tag, skipFlush, entities);
+            takeSnapshot(tag, skipFlush, ttl, entities);
         }
     }
 
@@ -3723,7 +3736,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     public void takeTableSnapshot(String keyspaceName, String tableName, String tag)
             throws IOException
     {
-        takeMultipleTableSnapshot(tag, false, keyspaceName + "." + tableName);
+        takeMultipleTableSnapshot(tag, false, null, keyspaceName + "." + tableName);
     }
 
     public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException
@@ -3744,7 +3757,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     public void takeSnapshot(String tag, String... keyspaceNames) throws IOException
     {
-        takeSnapshot(tag, false, keyspaceNames);
+        takeSnapshot(tag, false, null, keyspaceNames);
     }
 
     /**
@@ -3758,7 +3771,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     public void takeMultipleTableSnapshot(String tag, String... tableList)
             throws IOException
     {
-        takeMultipleTableSnapshot(tag, false, tableList);
+        takeMultipleTableSnapshot(tag, false, null, tableList);
     }
 
     /**
@@ -3768,7 +3781,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param skipFlush Skip blocking flush of memtable
      * @param keyspaceNames the names of the keyspaces to snapshot; empty means "all."
      */
-    private void takeSnapshot(String tag, boolean skipFlush, String... keyspaceNames) throws IOException
+    private void takeSnapshot(String tag, boolean skipFlush, Duration ttl, String... keyspaceNames) throws IOException
     {
         if (operationMode == Mode.JOINING)
             throw new IOException("Cannot snapshot until bootstrap completes");
@@ -3797,7 +3810,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         RateLimiter snapshotRateLimiter = DatabaseDescriptor.getSnapshotRateLimiter();
 
         for (Keyspace keyspace : keyspaces)
-            keyspace.snapshot(tag, null, skipFlush, snapshotRateLimiter);
+        {
+            keyspace.snapshot(tag, null, skipFlush, ttl, snapshotRateLimiter);
+        }
     }
 
     /**
@@ -3811,7 +3826,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param tableList
      *            list of tables from different keyspace in the form of ks1.cf1 ks2.cf2
      */
-    private void takeMultipleTableSnapshot(String tag, boolean skipFlush, String... tableList)
+    private void takeMultipleTableSnapshot(String tag, boolean skipFlush, Duration ttl, String... tableList)
             throws IOException
     {
         Map<Keyspace, List<String>> keyspaceColumnfamily = new HashMap<Keyspace, List<String>>();
@@ -3862,7 +3877,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         for (Entry<Keyspace, List<String>> entry : keyspaceColumnfamily.entrySet())
         {
             for (String table : entry.getValue())
-                entry.getKey().snapshot(tag, table, skipFlush, snapshotRateLimiter);
+                entry.getKey().snapshot(tag, table, skipFlush, ttl, snapshotRateLimiter);
         }
 
     }
@@ -3910,29 +3925,35 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             logger.debug("Cleared out snapshot directories");
     }
 
-    public Map<String, TabularData> getSnapshotDetails()
+    public Map<String, TabularData> getSnapshotDetails(Map<String, String> options)
     {
         Map<String, TabularData> snapshotMap = new HashMap<>();
         for (Keyspace keyspace : Keyspace.all())
         {
             for (ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores())
             {
-                for (Map.Entry<String, Directories.SnapshotSizeDetails> snapshotDetail : cfStore.getSnapshotDetails().entrySet())
+                for (Map.Entry<String, TableSnapshot> snapshotDetail : TableSnapshot.filter(cfStore.listSnapshots(), options).entrySet())
                 {
-                    TabularDataSupport data = (TabularDataSupport)snapshotMap.get(snapshotDetail.getKey());
+                    TabularDataSupport data = (TabularDataSupport) snapshotMap.get(snapshotDetail.getKey());
                     if (data == null)
                     {
                         data = new TabularDataSupport(SnapshotDetailsTabularData.TABULAR_TYPE);
                         snapshotMap.put(snapshotDetail.getKey(), data);
                     }
 
-                    SnapshotDetailsTabularData.from(snapshotDetail.getKey(), keyspace.getName(), cfStore.getTableName(), snapshotDetail, data);
+                    SnapshotDetailsTabularData.from(snapshotDetail.getValue(), data);
                 }
             }
         }
         return snapshotMap;
     }
 
+    @Deprecated
+    public Map<String, TabularData> getSnapshotDetails()
+    {
+        return getSnapshotDetails(ImmutableMap.of());
+    }
+
     public long trueSnapshotsSize()
     {
         long total = 0;
@@ -4892,6 +4913,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 logger.error("Batchlog manager timed out shutting down", t);
             }
 
+            snapshotManager.stop();
             HintsService.instance.pauseDispatch();
 
             if (daemon != null)
@@ -4984,6 +5006,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             }
             FBUtilities.waitOnFutures(flushes);
 
+            SnapshotManager.shutdownAndWait(1L, MINUTES);
             HintsService.instance.shutdownBlocking();
 
             // Interrupt ongoing compactions and shutdown CM to prevent further compactions.
@@ -6032,4 +6055,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         return DatabaseDescriptor.getCompactionTombstoneWarningThreshold();
     }
+
+    public void addSnapshot(TableSnapshot snapshot) {
+        snapshotManager.addSnapshot(snapshot);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index e53eaf9..3154c82 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -270,12 +270,21 @@ public interface StorageServiceMBean extends NotificationEmitter
     public void clearSnapshot(String tag, String... keyspaceNames) throws IOException;
 
     /**
-     *  Get the details of all the snapshot
+     * Get the details of all the snapshot
      * @return A map of snapshotName to all its details in Tabular form.
      */
+    @Deprecated
     public Map<String, TabularData> getSnapshotDetails();
 
     /**
+     * Get the details of all the snapshots
+     *
+     * @param options map of options used for filtering of snapshots
+     * @return A map of snapshotName to all its details in Tabular form.
+     */
+    public Map<String, TabularData> getSnapshotDetails(Map<String, String> options);
+
+    /**
      * Get the true size taken by all snapshots across all keyspaces.
      * @return True size taken by all the snapshots.
      */
diff --git a/src/java/org/apache/cassandra/service/snapshot/SnapshotManager.java b/src/java/org/apache/cassandra/service/snapshot/SnapshotManager.java
new file mode 100644
index 0000000..dd62a0e
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/snapshot/SnapshotManager.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.snapshot;
+
+
+import java.io.File;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.utils.ExecutorUtils;
+
+public class SnapshotManager {
+
+    private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("SnapshotCleanup");
+
+    private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
+
+    private final Supplier<Stream<TableSnapshot>> snapshotLoader;
+    private final long initialDelaySeconds;
+    private final long cleanupPeriodSeconds;
+
+    @VisibleForTesting
+    protected volatile ScheduledFuture cleanupTaskFuture;
+
+    /**
+     * Expiring ssnapshots ordered by expiration date, to allow only iterating over snapshots
+     * that need to be removed on {@link this#clearExpiredSnapshots()}
+     */
+    private final PriorityQueue<TableSnapshot> expiringSnapshots = new PriorityQueue<>(Comparator.comparing(x -> x.getExpiresAt()));
+
+    public SnapshotManager()
+    {
+        this(CassandraRelevantProperties.SNAPSHOT_CLEANUP_INITIAL_DELAY_SECONDS.getInt(),
+             CassandraRelevantProperties.SNAPSHOT_CLEANUP_PERIOD_SECONDS.getInt(),
+             () -> StreamSupport.stream(Keyspace.all().spliterator(), false)
+                                .flatMap(ks -> ks.getAllSnapshots()));
+    }
+
+    @VisibleForTesting
+    protected SnapshotManager(long initialDelaySeconds, long cleanupPeriodSeconds,
+                              Supplier<Stream<TableSnapshot>> snapshotLoader)
+    {
+        this.initialDelaySeconds = initialDelaySeconds;
+        this.cleanupPeriodSeconds = cleanupPeriodSeconds;
+        this.snapshotLoader = snapshotLoader;
+    }
+
+    public Collection<TableSnapshot> getExpiringSnapshots()
+    {
+        return expiringSnapshots;
+    }
+
+    public synchronized void start()
+    {
+        loadSnapshots();
+        resumeSnapshotCleanup();
+    }
+
+    public synchronized void stop() throws InterruptedException, TimeoutException
+    {
+        expiringSnapshots.clear();
+        if (cleanupTaskFuture != null)
+        {
+            cleanupTaskFuture.cancel(false);
+            cleanupTaskFuture = null;
+        }
+    }
+
+    public synchronized void addSnapshot(TableSnapshot snapshot)
+    {
+        // We currently only care about expiring snapshots
+        if (snapshot.isExpiring())
+        {
+            logger.debug("Adding expiring snapshot {}", snapshot);
+            expiringSnapshots.add(snapshot);
+        }
+    }
+
+    @VisibleForTesting
+    protected synchronized void loadSnapshots()
+    {
+        logger.debug("Loading snapshots");
+        snapshotLoader.get().forEach(this::addSnapshot);
+    }
+
+    // TODO: Support pausing snapshot cleanup
+    private synchronized void resumeSnapshotCleanup()
+    {
+        if (cleanupTaskFuture == null)
+        {
+            logger.info("Scheduling expired snapshot cleanup with initialDelaySeconds={} and cleanupPeriodSeconds={}");
+            cleanupTaskFuture = executor.scheduleWithFixedDelay(this::clearExpiredSnapshots, initialDelaySeconds,
+                                                                cleanupPeriodSeconds, TimeUnit.SECONDS);
+        }
+    }
+
+    @VisibleForTesting
+    protected synchronized void clearExpiredSnapshots()
+    {
+        Instant now = Instant.now();
+        while (!expiringSnapshots.isEmpty() && expiringSnapshots.peek().isExpired(now))
+        {
+            TableSnapshot expiredSnapshot = expiringSnapshots.peek();
+            logger.debug("Removing expired snapshot {}.", expiredSnapshot);
+            clearSnapshot(expiredSnapshot);
+        }
+    }
+
+    /**
+     * Deletes snapshot and remove it from manager
+     */
+    protected void clearSnapshot(TableSnapshot snapshot)
+    {
+        for (File snapshotDir : snapshot.getDirectories())
+        {
+            Directories.removeSnapshotDirectory(DatabaseDescriptor.getSnapshotRateLimiter(), snapshotDir);
+        }
+        expiringSnapshots.remove(snapshot);
+    }
+
+    @VisibleForTesting
+    public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+    {
+        ExecutorUtils.shutdownNowAndWait(timeout, unit, executor);
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java b/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java
new file mode 100644
index 0000000..8fbc619
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.cassandra.config.Duration;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+
+// Only serialize fields
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY,
+                getterVisibility = JsonAutoDetect.Visibility.NONE,
+                setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class SnapshotManifest
+{
+    private static final ObjectMapper mapper = new ObjectMapper();
+    static {
+        mapper.registerModule(new JavaTimeModule());
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @JsonProperty("files")
+    public final List<String> files;
+
+    @JsonProperty("created_at")
+    public final Instant createdAt;
+
+    @JsonProperty("expires_at")
+    public final Instant expiresAt;
+
+    /** needed for jackson serialization */
+    @SuppressWarnings("unused")
+    private SnapshotManifest() {
+        this.files = null;
+        this.createdAt = null;
+        this.expiresAt = null;
+    }
+
+    public SnapshotManifest(List<String> files, Duration ttl)
+    {
+        this.files = files;
+        this.createdAt = Instant.now();
+        this.expiresAt = ttl == null ? null : createdAt.plusMillis(ttl.toMilliseconds());
+    }
+
+    public List<String> getFiles()
+    {
+        return files;
+    }
+
+    public Instant getCreatedAt()
+    {
+        return createdAt;
+    }
+
+    public Instant getExpiresAt()
+    {
+        return expiresAt;
+    }
+
+    public void serializeToJsonFile(File outputFile) throws IOException
+    {
+        mapper.writeValue(outputFile, this);
+    }
+
+    public static SnapshotManifest deserializeFromJsonFile(File file) throws IOException
+    {
+        return mapper.readValue(file, SnapshotManifest.class);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        SnapshotManifest manifest = (SnapshotManifest) o;
+        return Objects.equals(files, manifest.files) && Objects.equals(createdAt, manifest.createdAt) && Objects.equals(expiresAt, manifest.expiresAt);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(files, createdAt, expiresAt);
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/snapshot/TableSnapshot.java b/src/java/org/apache/cassandra/service/snapshot/TableSnapshot.java
new file mode 100644
index 0000000..7e852ec
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/snapshot/TableSnapshot.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.snapshot;
+
+import java.io.File;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.io.util.FileUtils;
+
+public class TableSnapshot
+{
+    private final String keyspace;
+    private final String table;
+    private final String tag;
+
+    private final Instant createdAt;
+    private final Instant expiresAt;
+
+    private final Set<File> snapshotDirs;
+    private final Function<File, Long> trueDiskSizeComputer;
+
+    public TableSnapshot(String keyspace, String table, String tag, Instant createdAt,
+                         Instant expiresAt, Set<File> snapshotDirs,
+                         Function<File, Long> trueDiskSizeComputer)
+    {
+        this.keyspace = keyspace;
+        this.table = table;
+        this.tag = tag;
+        this.createdAt = createdAt;
+        this.expiresAt = expiresAt;
+        this.snapshotDirs = snapshotDirs;
+        this.trueDiskSizeComputer = trueDiskSizeComputer;
+    }
+
+    public String getKeyspace()
+    {
+        return keyspace;
+    }
+
+    public String getTable()
+    {
+        return table;
+    }
+
+    public String getTag()
+    {
+        return tag;
+    }
+
+    public Instant getCreatedAt()
+    {
+        if (createdAt == null)
+        {
+            long minCreation = snapshotDirs.stream().mapToLong(File::lastModified).min().orElse(0);
+            if (minCreation != 0)
+            {
+                return Instant.ofEpochMilli(minCreation);
+            }
+        }
+        return createdAt;
+    }
+
+    public Instant getExpiresAt()
+    {
+        return expiresAt;
+    }
+
+    public boolean isExpired(Instant now)
+    {
+        if (createdAt == null || expiresAt == null)
+        {
+            return false;
+        }
+
+        return expiresAt.compareTo(now) < 0;
+    }
+
+    public boolean exists()
+    {
+        return snapshotDirs.stream().anyMatch(File::exists);
+    }
+
+    public boolean isExpiring()
+    {
+        return expiresAt != null;
+    }
+
+    public long computeSizeOnDiskBytes()
+    {
+        return snapshotDirs.stream().mapToLong(FileUtils::folderSize).sum();
+    }
+
+    public long computeTrueSizeBytes()
+    {
+        return snapshotDirs.stream().mapToLong(trueDiskSizeComputer::apply).sum();
+    }
+
+    public Collection<File> getDirectories()
+    {
+        return snapshotDirs;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "TableSnapshot{" +
+               "keyspace='" + keyspace + '\'' +
+               ", table='" + table + '\'' +
+               ", tag='" + tag + '\'' +
+               ", createdAt=" + createdAt +
+               ", expiresAt=" + expiresAt +
+               ", snapshotDirs=" + snapshotDirs +
+               '}';
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        TableSnapshot that = (TableSnapshot) o;
+        return Objects.equals(keyspace, that.keyspace) && Objects.equals(table, that.table)
+               && Objects.equals(tag, that.tag) && Objects.equals(createdAt, that.createdAt)
+               && Objects.equals(expiresAt, that.expiresAt) && Objects.equals(snapshotDirs, that.snapshotDirs);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(keyspace, table, tag, createdAt, expiresAt, snapshotDirs);
+    }
+
+    public static Map<String, TableSnapshot> filter(Map<String, TableSnapshot> snapshots, Map<String, String> options)
+    {
+        if (options == null)
+            return snapshots;
+
+        boolean skipExpiring = Boolean.parseBoolean(options.getOrDefault("no_ttl", "false"));
+
+        return snapshots.entrySet()
+                        .stream()
+                        .filter(entry -> !skipExpiring || !entry.getValue().isExpiring())
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+}
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index dd94c92..e7c595e 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -55,10 +55,12 @@ import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
 import javax.rmi.ssl.SslRMIClientSocketFactory;
 
+
 import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.audit.AuditLogManagerMBean;
 import org.apache.cassandra.audit.AuditLogOptions;
 import org.apache.cassandra.audit.AuditLogOptionsCompositeData;
+import com.google.common.collect.ImmutableMap;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.batchlog.BatchlogManagerMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
@@ -710,9 +712,15 @@ public class NodeProbe implements AutoCloseable
         ssProxy.clearSnapshot(tag, keyspaces);
     }
 
+    public Map<String, TabularData> getSnapshotDetails(Map<String, String> options)
+    {
+        return ssProxy.getSnapshotDetails(options);
+    }
+
+    @Deprecated
     public Map<String, TabularData> getSnapshotDetails()
     {
-        return ssProxy.getSnapshotDetails();
+        return getSnapshotDetails(ImmutableMap.of());
     }
 
     public long trueSnapshotsSize()
diff --git a/src/java/org/apache/cassandra/tools/nodetool/ListSnapshots.java b/src/java/org/apache/cassandra/tools/nodetool/ListSnapshots.java
index 79494cd..b70a7a9 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/ListSnapshots.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/ListSnapshots.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.tools.nodetool;
 
 import java.io.PrintStream;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -25,6 +26,7 @@ import javax.management.openmbean.TabularData;
 
 import io.airlift.airline.Command;
 
+import io.airlift.airline.Option;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
@@ -33,6 +35,11 @@ import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
 @Command(name = "listsnapshots", description = "Lists all the snapshots along with the size on disk and true size. True size is the total size of all SSTables which are not backed up to disk. Size on disk is total size of the snapshot on disk. Total TrueDiskSpaceUsed does not make any SSTable deduplication.")
 public class ListSnapshots extends NodeToolCmd
 {
+    @Option(title = "no_ttl",
+    name = { "-nt", "--no-ttl" },
+    description = "Skip snapshots with TTL")
+    private boolean noTTL = false;
+
     @Override
     public void execute(NodeProbe probe)
     {
@@ -41,7 +48,10 @@ public class ListSnapshots extends NodeToolCmd
         {
             out.println("Snapshot Details: ");
 
-            final Map<String,TabularData> snapshotDetails = probe.getSnapshotDetails();
+            Map<String, String> options = new HashMap<>();
+            options.put("no_ttl", Boolean.toString(noTTL));
+
+            final Map<String, TabularData> snapshotDetails = probe.getSnapshotDetails(options);
             if (snapshotDetails.isEmpty())
             {
                 out.println("There are no snapshots");
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Snapshot.java b/src/java/org/apache/cassandra/tools/nodetool/Snapshot.java
index 6619c53..f57708a 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Snapshot.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Snapshot.java
@@ -32,6 +32,7 @@ import java.util.Map;
 
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+import org.apache.cassandra.config.Duration;
 
 @Command(name = "snapshot", description = "Take a snapshot of specified keyspaces or a snapshot of the specified table")
 public class Snapshot extends NodeToolCmd
@@ -51,6 +52,9 @@ public class Snapshot extends NodeToolCmd
     @Option(title = "skip-flush", name = {"-sf", "--skip-flush"}, description = "Do not flush memtables before snapshotting (snapshot will not contain unflushed data)")
     private boolean skipFlush = false;
 
+    @Option(title = "ttl", name = {"--ttl"}, description = "Specify a TTL of created snapshot")
+    private String ttl = null;
+
     @Override
     public void execute(NodeProbe probe)
     {
@@ -63,6 +67,10 @@ public class Snapshot extends NodeToolCmd
 
             Map<String, String> options = new HashMap<String,String>();
             options.put("skipFlush", Boolean.toString(skipFlush));
+            if (null != ttl) {
+                Duration d = new Duration(ttl);
+                options.put("ttl", d.toString());
+            }
 
             // Create a separate path for kclist to avoid breaking of already existing scripts
             if (null != ktList && !ktList.isEmpty())
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index fc02e70..5d44deb 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -113,6 +113,7 @@ import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.cassandra.service.snapshot.SnapshotManager;
 import org.apache.cassandra.streaming.StreamReceiveTask;
 import org.apache.cassandra.streaming.StreamTransferTask;
 import org.apache.cassandra.streaming.async.StreamingInboundHandler;
@@ -730,7 +731,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                                 () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
                                 () -> SSTableReader.shutdownBlocking(1L, MINUTES),
                                 () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
-                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)
+                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
+                                () -> SnapshotManager.shutdownAndWait(1L, MINUTES)
             );
 
             error = parallelRun(error, executor,
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index 1ad1ba6..9fe3cdb 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -417,11 +417,11 @@ public class PreviewRepairTest extends TestBaseImpl
             ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
             if(shouldBeEmpty)
             {
-                assertTrue(cfs.getSnapshotDetails().isEmpty());
+                assertTrue(cfs.listSnapshots().isEmpty());
             }
             else
             {
-                while (cfs.getSnapshotDetails().isEmpty())
+                while (cfs.listSnapshots().isEmpty())
                     Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
             }
         }));
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 308702a..0badb89 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -19,8 +19,6 @@
 package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
-import java.time.LocalDate;
-import java.time.format.DateTimeFormatter;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.Map;
@@ -44,7 +42,6 @@ import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.SnapshotVerbHandler;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.DiagnosticSnapshotService;
 
@@ -447,7 +444,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
             int attempts = 100;
             ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
 
-            while (cfs.getSnapshotDetails().isEmpty())
+            while (cfs.listSnapshots().isEmpty())
             {
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
                 if (attempts-- < 0)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SnapshotsTTLTest.java b/test/distributed/org/apache/cassandra/distributed/test/SnapshotsTTLTest.java
new file mode 100644
index 0000000..ace6c11
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SnapshotsTTLTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.WithProperties;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+
+public class SnapshotsTTLTest extends TestBaseImpl
+{
+    public static final Integer SNAPSHOT_CLEANUP_PERIOD_SECONDS = 1;
+    public static final Integer FIVE_SECONDS = 5;
+    private static WithProperties properties = new WithProperties();
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void before() throws IOException
+    {
+        properties.set(CassandraRelevantProperties.SNAPSHOT_CLEANUP_INITIAL_DELAY_SECONDS, 0);
+        properties.set(CassandraRelevantProperties.SNAPSHOT_CLEANUP_PERIOD_SECONDS, SNAPSHOT_CLEANUP_PERIOD_SECONDS);
+        properties.set(CassandraRelevantProperties.SNAPSHOT_MIN_ALLOWED_TTL_SECONDS, FIVE_SECONDS);
+        cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.GOSSIP)).start());
+    }
+
+    @AfterClass
+    public static void after()
+    {
+        properties.close();
+        if (cluster != null)
+            cluster.close();
+    }
+
+    @Test
+    public void testSnapshotsCleanupByTTL() throws Exception
+    {
+        cluster.get(1).nodetoolResult("snapshot", "--ttl", String.format("%ds", FIVE_SECONDS),
+                                      "-t", "basic").asserts().success();
+        cluster.get(1).nodetoolResult("listsnapshots").asserts().success().stdoutContains("basic");
+
+        Thread.sleep(2 * FIVE_SECONDS * 1000L);
+        cluster.get(1).nodetoolResult("listsnapshots").asserts().success().stdoutNotContains("basic");
+    }
+
+    @Test
+    public void testSnapshotCleanupAfterRestart() throws Exception
+    {
+        int TWENTY_SECONDS = 20; // longer TTL to allow snapshot to survive node restart
+        IInvokableInstance instance = cluster.get(1);
+
+        // Create snapshot and check exists
+        instance.nodetoolResult("snapshot", "--ttl", String.format("%ds", TWENTY_SECONDS),
+                                "-t", "basic").asserts().success();
+        instance.nodetoolResult("listsnapshots").asserts().success().stdoutContains("basic");
+
+        // Restart node
+        stopUnchecked(instance);
+        instance.startup();
+
+        // Check snapshot still exists after restart
+        instance.nodetoolResult("listsnapshots").asserts().success().stdoutContains("basic");
+
+        // Sleep for 2*TTL and then check snapshot is gone
+        Thread.sleep(TWENTY_SECONDS * 1000L);
+        cluster.get(1).nodetoolResult("listsnapshots").asserts().success().stdoutNotContains("basic");
+    }
+
+    @Test
+    public void testSnapshotInvalidArgument() throws Exception
+    {
+        IInvokableInstance instance = cluster.get(1);
+
+        instance.nodetoolResult("snapshot", "--ttl", String.format("%ds", 1),
+                                "-t", "basic").asserts().failure().stdoutContains(String.format("ttl for snapshot must be at least %d seconds", FIVE_SECONDS));
+
+        instance.nodetoolResult("snapshot", "--ttl", "invalid-ttl").asserts().failure();
+    }
+
+    @Test
+    public void testListingSnapshotsWithoutTTL()
+    {
+        // take snapshot without ttl
+        cluster.get(1).nodetoolResult("snapshot", "-t", "snapshot_without_ttl").asserts().success();
+
+        // take snapshot with ttl
+        cluster.get(1).nodetoolResult("snapshot", "--ttl",
+                                      String.format("%ds", 1000),
+                                      "-t", "snapshot_with_ttl").asserts().success();
+
+        // list snaphots without TTL
+        NodeToolResult.Asserts withoutTTLResult = cluster.get(1).nodetoolResult("listsnapshots", "-nt").asserts().success();
+        withoutTTLResult.stdoutContains("snapshot_without_ttl");
+        withoutTTLResult.stdoutNotContains("snapshot_with_ttl");
+
+        // list all snapshots
+        NodeToolResult.Asserts allSnapshotsResult = cluster.get(1).nodetoolResult("listsnapshots").asserts().success();
+        allSnapshotsResult.stdoutContains("snapshot_without_ttl");
+        allSnapshotsResult.stdoutContains("snapshot_with_ttl");
+    }
+
+    @Test
+    public void testManualSnapshotCleanup() throws Exception
+    {
+        // take snapshots with ttl
+        NodeToolResult.Asserts listSnapshotsResult;
+        cluster.get(1).nodetoolResult("snapshot", "--ttl",
+                                      String.format("%ds", FIVE_SECONDS),
+                                      "-t", "first").asserts().success();
+
+        cluster.get(1).nodetoolResult("snapshot", "--ttl",
+                                      String.format("%ds", FIVE_SECONDS),
+                                      "-t", "second").asserts().success();
+
+        listSnapshotsResult = cluster.get(1).nodetoolResult("listsnapshots").asserts().success();
+        listSnapshotsResult.stdoutContains("first");
+        listSnapshotsResult.stdoutContains("second");
+
+        cluster.get(1).nodetoolResult("clearsnapshot", "-t", "first").asserts().success();
+
+        listSnapshotsResult = cluster.get(1).nodetoolResult("listsnapshots").asserts().success();
+        listSnapshotsResult.stdoutNotContains("first");
+        listSnapshotsResult.stdoutContains("second");
+
+        Thread.sleep(2 * FIVE_SECONDS * 1000L);
+
+        listSnapshotsResult = cluster.get(1).nodetoolResult("listsnapshots").asserts().success();
+        listSnapshotsResult.stdoutNotContains("first");
+        listSnapshotsResult.stdoutNotContains("second");
+    }
+
+    @Test
+    public void testSecondaryIndexCleanup() throws Exception {
+        cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS default WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};");
+        cluster.schemaChange("CREATE TABLE default.tbl (key int, value text, PRIMARY KEY (key))");
+        cluster.schemaChange("CREATE INDEX value_idx ON default.tbl (value)");
+
+        populate(cluster);
+
+        cluster.get(1).nodetoolResult("snapshot", "--ttl",
+                                      String.format("%ds", FIVE_SECONDS),
+                                      "-t", "first",
+                                      "-kt", "default.tbl").asserts().success();
+
+        NodeToolResult.Asserts listSnapshotsResult;
+
+        listSnapshotsResult = cluster.get(1).nodetoolResult("listsnapshots").asserts().success();
+        listSnapshotsResult.stdoutContains("first");
+
+        Thread.sleep(FIVE_SECONDS * 2 * 1000);
+
+        listSnapshotsResult = cluster.get(1).nodetoolResult("listsnapshots").asserts().success();
+        listSnapshotsResult.stdoutNotContains("first");
+    }
+
+    private void populate(Cluster cluster) {
+        for (int i = 0; i < 100; i++) {
+            cluster.coordinator(1).execute("INSERT INTO default.tbl (key, value) VALUES (?, 'txt')", ConsistencyLevel.ONE, i);
+        }
+
+    }
+}
diff --git a/test/unit/org/apache/cassandra/config/DurationTest.java b/test/unit/org/apache/cassandra/config/DurationTest.java
new file mode 100644
index 0000000..c89792f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/config/DurationTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.*;
+
+public class DurationTest
+{
+    @Test
+    public void testConversions()
+    {
+        assertEquals(10, new Duration("10s").toSeconds());
+        assertEquals(10000, new Duration("10s").toMilliseconds());
+        assertEquals(0, new Duration("10s").toMinutes());
+        assertEquals(10, new Duration("10m").toMinutes());
+        assertEquals(600000, new Duration("10m").toMilliseconds());
+        assertEquals(600, new Duration("10m").toSeconds());
+    }
+
+    @Test
+    public void testInvalidInputs()
+    {
+        assertThatThrownBy(() -> new Duration("10")).isInstanceOf(IllegalArgumentException.class)
+                                                    .hasMessageContaining("Invalid duration: 10");
+        assertThatThrownBy(() -> new Duration("-10s")).isInstanceOf(IllegalArgumentException.class)
+                                                      .hasMessageContaining("Invalid duration: -10s");
+        assertThatThrownBy(() -> new Duration("10xd")).isInstanceOf(IllegalArgumentException.class)
+                                                      .hasMessageContaining("Unsupported time unit: xd. Supported units are: ns, us, ms, s, m, h, d");
+    }
+
+    @Test
+    public void testEquals()
+    {
+        assertEquals(new Duration("10s"), new Duration("10s"));
+        assertEquals(new Duration("10s"), new Duration("10000ms"));
+        assertEquals(new Duration("10000ms"), new Duration("10s"));
+        assertEquals(Duration.inMinutes(Long.MAX_VALUE), Duration.inMinutes(Long.MAX_VALUE));
+        assertNotEquals(Duration.inMinutes(Long.MAX_VALUE), Duration.inMilliseconds(Long.MAX_VALUE));
+        assertNotEquals(new Duration("0m"), new Duration("10ms"));
+    }
+
+}
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 48ef580..583c1eb 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.db;
 
 import java.io.File;
-import java.io.FileReader;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
@@ -33,11 +32,14 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.snapshot.SnapshotManifest;
+import org.apache.cassandra.service.snapshot.TableSnapshot;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Iterators;
@@ -87,6 +89,7 @@ public class ColumnFamilyStoreTest
     {
         Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
         Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2).truncateBlocking();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_INDEX1).truncateBlocking();
         Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
     }
 
@@ -240,14 +243,14 @@ public class ColumnFamilyStoreTest
         cfs.snapshot("nonEphemeralSnapshot", null, false, false);
         cfs.snapshot("ephemeralSnapshot", null, true, false);
 
-        Map<String, Directories.SnapshotSizeDetails> snapshotDetails = cfs.getSnapshotDetails();
+        Map<String, TableSnapshot> snapshotDetails = cfs.listSnapshots();
         assertEquals(2, snapshotDetails.size());
         assertTrue(snapshotDetails.containsKey("ephemeralSnapshot"));
         assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
 
         ColumnFamilyStore.clearEphemeralSnapshots(cfs.getDirectories());
 
-        snapshotDetails = cfs.getSnapshotDetails();
+        snapshotDetails = cfs.listSnapshots();
         assertEquals(1, snapshotDetails.size());
         assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
 
@@ -446,19 +449,69 @@ public class ColumnFamilyStoreTest
         cfs.snapshotWithoutFlush(snapshotName);
 
         File snapshotManifestFile = cfs.getDirectories().getSnapshotManifestFile(snapshotName);
-        JSONParser parser = new JSONParser();
-        JSONObject manifest = (JSONObject) parser.parse(new FileReader(snapshotManifestFile));
-        JSONArray files = (JSONArray) manifest.get("files");
+        SnapshotManifest manifest = SnapshotManifest.deserializeFromJsonFile(snapshotManifestFile);
 
         // Keyspace1-Indexed1 and the corresponding index
-        assert files.size() == 2;
+        assertThat(manifest.getFiles()).hasSize(2);
 
         // Snapshot of the secondary index is stored in the subfolder with the same file name
-        String baseTableFile = (String) files.get(0);
-        String indexTableFile = (String) files.get(1);
-        assert !baseTableFile.equals(indexTableFile);
-        assert Directories.isSecondaryIndexFolder(new File(indexTableFile).getParentFile());
-        assert indexTableFile.endsWith(baseTableFile);
+        String baseTableFile = manifest.getFiles().get(0);
+        String indexTableFile = manifest.getFiles().get(1);
+        assertThat(baseTableFile).isNotEqualTo(indexTableFile);
+        assertThat(Directories.isSecondaryIndexFolder(new File(indexTableFile).getParentFile())).isTrue();
+        assertThat(indexTableFile).endsWith(baseTableFile);
+    }
+
+    private void createSnapshotAndDelete(String ks, String table, boolean writeData)
+    {
+        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table);
+        if (writeData)
+        {
+            writeData(cfs);
+        }
+
+        TableSnapshot snapshot = cfs.snapshot("basic");
+
+
+        assertThat(snapshot.exists()).isTrue();
+        assertThat(cfs.listSnapshots().containsKey("basic")).isTrue();
+        assertThat(cfs.listSnapshots().get("basic")).isEqualTo(snapshot);
+
+        snapshot.getDirectories().forEach(FileUtils::deleteRecursive);
+
+        assertThat(snapshot.exists()).isFalse();
+        assertFalse(cfs.listSnapshots().containsKey("basic"));
+    }
+
+    private void writeData(ColumnFamilyStore cfs)
+    {
+        if (cfs.name.equals(CF_INDEX1))
+        {
+            new RowUpdateBuilder(cfs.metadata(), 2, "key").add("birthdate", 1L).add("notbirthdate", 2L).build().applyUnsafe();
+            cfs.forceBlockingFlush();
+        }
+        else
+        {
+            new RowUpdateBuilder(cfs.metadata(), 2, "key").clustering("name").add("val", "2").build().applyUnsafe();
+            cfs.forceBlockingFlush();
+        }
+    }
+
+    @Test
+    public void testSnapshotCreationAndDeleteEmptyTable() {
+        createSnapshotAndDelete(KEYSPACE1, CF_INDEX1, false);
+        createSnapshotAndDelete(KEYSPACE1, CF_STANDARD1, false);
+        createSnapshotAndDelete(KEYSPACE1, CF_STANDARD2, false);
+        createSnapshotAndDelete(KEYSPACE2, CF_STANDARD1, false);
+        createSnapshotAndDelete(SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2, false);
+    }
+
+    @Test
+    public void testSnapshotCreationAndDeletePopulatedTable() {
+        createSnapshotAndDelete(KEYSPACE1, CF_INDEX1, true);
+        createSnapshotAndDelete(KEYSPACE1, CF_STANDARD1, true);
+        createSnapshotAndDelete(KEYSPACE1, CF_STANDARD2, true);
+        createSnapshotAndDelete(KEYSPACE2, CF_STANDARD1, true);
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 9756635..fbecc69 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -22,6 +22,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -35,6 +36,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.config.Duration;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.schema.Indexes;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -55,8 +57,11 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.DefaultFSErrorHandler;
+import org.apache.cassandra.service.snapshot.SnapshotManifest;
+import org.apache.cassandra.service.snapshot.TableSnapshot;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
@@ -65,11 +70,16 @@ import static org.junit.Assert.fail;
 
 public class DirectoriesTest
 {
+    public static final String TABLE_NAME = "FakeTable";
+    public static final String SNAPSHOT1 = "snapshot1";
+    public static final String SNAPSHOT2 = "snapshot2";
+
+    public static final String LEGACY_SNAPSHOT_NAME = "42";
     private static File tempDataDir;
     private static final String KS = "ks";
     private static String[] TABLES;
     private static Set<TableMetadata> CFM;
-    private static Map<String, List<File>> files;
+    private static Map<String, List<File>> sstablesByTableName;
 
     @BeforeClass
     public static void beforeClass()
@@ -83,14 +93,11 @@ public class DirectoriesTest
     {
         TABLES = new String[] { "cf1", "ks" };
         CFM = new HashSet<>(TABLES.length);
-        files = new HashMap<>();
+        sstablesByTableName = new HashMap<>();
 
         for (String table : TABLES)
         {
-            CFM.add(TableMetadata.builder(KS, table)
-                                 .addPartitionKeyColumn("thekey", UTF8Type.instance)
-                                 .addClusteringColumn("thecolumn", UTF8Type.instance)
-                                 .build());
+            CFM.add(createFakeTable(table));
         }
 
         tempDataDir = FileUtils.createTempFile("cassandra", "unittest");
@@ -116,33 +123,91 @@ public class DirectoriesTest
     {
         for (TableMetadata cfm : CFM)
         {
-            List<File> fs = new ArrayList<>();
-            files.put(cfm.name, fs);
-            File dir = cfDir(cfm);
-            dir.mkdirs();
+            List<File> allSStables = new ArrayList<>();
+            sstablesByTableName.put(cfm.name, allSStables);
+            File tableDir = cfDir(cfm);
+            tableDir.mkdirs();
 
-            createFakeSSTable(dir, cfm.name, 1, fs);
-            createFakeSSTable(dir, cfm.name, 2, fs);
+            allSStables.addAll(createFakeSSTable(tableDir, cfm.name, 1));
+            allSStables.addAll(createFakeSSTable(tableDir, cfm.name, 2));
 
-            File backupDir = new File(dir, Directories.BACKUPS_SUBDIR);
+            File backupDir = new File(tableDir, Directories.BACKUPS_SUBDIR);
             backupDir.mkdir();
-            createFakeSSTable(backupDir, cfm.name, 1, fs);
+            allSStables.addAll(createFakeSSTable(backupDir, cfm.name, 1));
 
-            File snapshotDir = new File(dir, Directories.SNAPSHOT_SUBDIR + File.separator + "42");
+            File snapshotDir = new File(tableDir, Directories.SNAPSHOT_SUBDIR + File.separator + LEGACY_SNAPSHOT_NAME);
             snapshotDir.mkdirs();
-            createFakeSSTable(snapshotDir, cfm.name, 1, fs);
+            allSStables.addAll(createFakeSSTable(snapshotDir, cfm.name, 1));
+        }
+    }
+
+    class FakeSnapshot {
+        final TableMetadata table;
+        final String tag;
+        final File snapshotDir;
+        final SnapshotManifest manifest;
+
+        FakeSnapshot(TableMetadata table, String tag, File snapshotDir, SnapshotManifest manifest)
+        {
+            this.table = table;
+            this.tag = tag;
+            this.snapshotDir = snapshotDir;
+            this.manifest = manifest;
+        }
+
+        public TableSnapshot asTableSnapshot()
+        {
+            Instant createdAt = manifest == null ? null : manifest.createdAt;
+            Instant expiresAt = manifest == null ? null : manifest.expiresAt;
+            return new TableSnapshot(table.keyspace, table.name, tag, createdAt, expiresAt, Collections.singleton(snapshotDir), null);
+        }
+    }
+
+    private TableMetadata createFakeTable(String table)
+    {
+        return TableMetadata.builder(KS, table)
+                            .addPartitionKeyColumn("thekey", UTF8Type.instance)
+                            .addClusteringColumn("thecolumn", UTF8Type.instance)
+                            .build();
+    }
+
+    public FakeSnapshot createFakeSnapshot(TableMetadata table, String tag, boolean createManifest) throws IOException
+    {
+        File tableDir = cfDir(table);
+        tableDir.mkdirs();
+        File snapshotDir = new File(tableDir, Directories.SNAPSHOT_SUBDIR + File.separator + tag);
+        snapshotDir.mkdirs();
+
+        Descriptor sstableDesc = new Descriptor(snapshotDir, KS, table.name, 1, SSTableFormat.Type.BIG);
+        createFakeSSTable(sstableDesc);
+
+        SnapshotManifest manifest = null;
+        if (createManifest)
+        {
+            File manifestFile = Directories.getSnapshotManifestFile(snapshotDir);
+            manifest = new SnapshotManifest(Collections.singletonList(sstableDesc.filenameFor(Component.DATA)), new Duration("1m"));
+            manifest.serializeToJsonFile(manifestFile);
         }
+
+        return new FakeSnapshot(table, tag, snapshotDir, manifest);
     }
 
-    private static void createFakeSSTable(File dir, String cf, int gen, List<File> addTo) throws IOException
+    private static List<File> createFakeSSTable(File dir, String cf, int gen) throws IOException
     {
         Descriptor desc = new Descriptor(dir, KS, cf, gen, SSTableFormat.Type.BIG);
+        return createFakeSSTable(desc);
+    }
+
+    private static List<File> createFakeSSTable(Descriptor desc) throws IOException
+    {
+        List<File> components = new ArrayList<>(3);
         for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER })
         {
             File f = new File(desc.filenameFor(c));
             f.createNewFile();
-            addTo.add(f);
+            components.add(f);
         }
+        return components;
     }
 
     private static File cfDir(TableMetadata metadata)
@@ -172,8 +237,8 @@ public class DirectoriesTest
             assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
 
             Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.name, 1, SSTableFormat.Type.BIG);
-            File snapshotDir = new File(cfDir(cfm),  File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42");
-            assertEquals(snapshotDir.getCanonicalFile(), Directories.getSnapshotDirectory(desc, "42"));
+            File snapshotDir = new File(cfDir(cfm),  File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + LEGACY_SNAPSHOT_NAME);
+            assertEquals(snapshotDir.getCanonicalFile(), Directories.getSnapshotDirectory(desc, LEGACY_SNAPSHOT_NAME));
 
             File backupsDir = new File(cfDir(cfm),  File.separator + Directories.BACKUPS_SUBDIR);
             assertEquals(backupsDir.getCanonicalFile(), Directories.getBackupsDirectory(desc));
@@ -181,6 +246,84 @@ public class DirectoriesTest
     }
 
     @Test
+    public void testListSnapshots() throws Exception {
+        // Initial state
+        TableMetadata fakeTable = createFakeTable(TABLE_NAME);
+        Directories directories = new Directories(fakeTable, toDataDirectories(tempDataDir));
+        assertThat(directories.listSnapshots()).isEmpty();
+
+        // Create snapshot with and without manifest
+        FakeSnapshot snapshot1 = createFakeSnapshot(fakeTable, SNAPSHOT1, true);
+        FakeSnapshot snapshot2 = createFakeSnapshot(fakeTable, SNAPSHOT2, false);
+
+        // Both snapshots should be present
+        Map<String, TableSnapshot> snapshots = directories.listSnapshots();
+        assertThat(snapshots.keySet()).isEqualTo(Sets.newHashSet(SNAPSHOT1, SNAPSHOT2));
+        assertThat(snapshots.get(SNAPSHOT1)).isEqualTo(snapshot1.asTableSnapshot());
+        assertThat(snapshots.get(SNAPSHOT2)).isEqualTo(snapshot2.asTableSnapshot());
+
+        // Now remove snapshot1
+        FileUtils.deleteRecursive(snapshot1.snapshotDir);
+
+        // Only snapshot 2 should be present
+        snapshots = directories.listSnapshots();
+        assertThat(snapshots.keySet()).isEqualTo(Sets.newHashSet(SNAPSHOT2));
+        assertThat(snapshots.get(SNAPSHOT2)).isEqualTo(snapshot2.asTableSnapshot());
+    }
+
+    @Test
+    public void testListSnapshotDirsByTag() throws Exception {
+        // Initial state
+        TableMetadata fakeTable = createFakeTable("FakeTable");
+        Directories directories = new Directories(fakeTable, toDataDirectories(tempDataDir));
+        assertThat(directories.listSnapshotDirsByTag()).isEmpty();
+
+        // Create snapshot with and without manifest
+        FakeSnapshot snapshot1 = createFakeSnapshot(fakeTable, SNAPSHOT1, true);
+        FakeSnapshot snapshot2 = createFakeSnapshot(fakeTable, SNAPSHOT2, false);
+
+        // Both snapshots should be present
+        Map<String, Set<File>> snapshotDirs = directories.listSnapshotDirsByTag();
+        assertThat(snapshotDirs.keySet()).isEqualTo(Sets.newHashSet(SNAPSHOT1, SNAPSHOT2));
+        assertThat(snapshotDirs.get(SNAPSHOT1)).allMatch(snapshotDir -> snapshotDir.equals(snapshot1.snapshotDir));
+        assertThat(snapshotDirs.get(SNAPSHOT2)).allMatch(snapshotDir -> snapshotDir.equals(snapshot2.snapshotDir));
+
+        // Now remove snapshot1
+        FileUtils.deleteRecursive(snapshot1.snapshotDir);
+
+        // Only snapshot 2 should be present
+        snapshotDirs = directories.listSnapshotDirsByTag();
+        assertThat(snapshotDirs.keySet()).isEqualTo(Sets.newHashSet(SNAPSHOT2));
+    }
+
+    @Test
+    public void testMaybeManifestLoading() throws Exception {
+        for (TableMetadata cfm : CFM)
+        {
+            String tag = "test";
+            Directories directories = new Directories(cfm, toDataDirectories(tempDataDir));
+            Descriptor parentDesc = new Descriptor(directories.getDirectoryForNewSSTables(), KS, cfm.name, 0, SSTableFormat.Type.BIG);
+            File parentSnapshotDirectory = Directories.getSnapshotDirectory(parentDesc, tag);
+
+            List<String> files = new LinkedList<>();
+            files.add(parentSnapshotDirectory.getAbsolutePath());
+
+            File manifestFile = directories.getSnapshotManifestFile(tag);
+
+            SnapshotManifest manifest = new SnapshotManifest(files, new Duration("1m"));
+            manifest.serializeToJsonFile(manifestFile);
+
+            Set<File> dirs = new HashSet<>();
+
+            dirs.add(manifestFile.getParentFile());
+            dirs.add(new File("buzz"));
+            SnapshotManifest loadedManifest = Directories.maybeLoadManifest(KS, cfm.name, tag, dirs);
+
+            assertEquals(manifest, loadedManifest);
+        }
+    }
+
+    @Test
     public void testSecondaryIndexDirectories()
     {
         TableMetadata.Builder builder =
@@ -228,13 +371,13 @@ public class DirectoriesTest
         assertEquals(40, indexDirectories.trueSnapshotsSize());
 
         // check snapshot details
-        Map<String, Directories.SnapshotSizeDetails> parentSnapshotDetail = parentDirectories.getSnapshotDetails();
+        Map<String, TableSnapshot> parentSnapshotDetail = parentDirectories.listSnapshots();
         assertTrue(parentSnapshotDetail.containsKey("test"));
-        assertEquals(30L, parentSnapshotDetail.get("test").dataSizeBytes);
+        assertEquals(30L, parentSnapshotDetail.get("test").computeTrueSizeBytes());
 
-        Map<String, Directories.SnapshotSizeDetails> indexSnapshotDetail = indexDirectories.getSnapshotDetails();
+        Map<String, TableSnapshot> indexSnapshotDetail = indexDirectories.listSnapshots();
         assertTrue(indexSnapshotDetail.containsKey("test"));
-        assertEquals(40L, indexSnapshotDetail.get("test").dataSizeBytes);
+        assertEquals(40L, indexSnapshotDetail.get("test").computeTrueSizeBytes());
 
         // check backup directory
         File parentBackupDirectory = Directories.getBackupsDirectory(parentDesc);
@@ -270,7 +413,7 @@ public class DirectoriesTest
         Set<File> listed;// List all but no snapshot, backup
         lister = directories.sstableLister(Directories.OnTxnErr.THROW);
         listed = new HashSet<>(lister.listFiles());
-        for (File f : files.get(cfm.name))
+        for (File f : sstablesByTableName.get(cfm.name))
         {
             if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR))
                 assertFalse(f + " should not be listed", listed.contains(f));
@@ -281,7 +424,7 @@ public class DirectoriesTest
         // List all but including backup (but no snapshot)
         lister = directories.sstableLister(Directories.OnTxnErr.THROW).includeBackups(true);
         listed = new HashSet<>(lister.listFiles());
-        for (File f : files.get(cfm.name))
+        for (File f : sstablesByTableName.get(cfm.name))
         {
             if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR))
                 assertFalse(f + " should not be listed", listed.contains(f));
@@ -292,7 +435,7 @@ public class DirectoriesTest
         // Skip temporary and compacted
         lister = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
         listed = new HashSet<>(lister.listFiles());
-        for (File f : files.get(cfm.name))
+        for (File f : sstablesByTableName.get(cfm.name))
         {
             if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR))
                 assertFalse(f + " should not be listed", listed.contains(f));
@@ -682,18 +825,17 @@ public class DirectoriesTest
         long totalAvailable = 0L;
 
         for (DataDirectory dataDir : dataDirectories)
-            {
-                Directories.DataDirectoryCandidate candidate = new Directories.DataDirectoryCandidate(dataDir);
-                // exclude directory if its total writeSize does not fit to data directory
-                if (candidate.availableSpace < writeSize)
-                    continue;
-                candidates.add(candidate);
-                totalAvailable += candidate.availableSpace;
-            }
+        {
+            Directories.DataDirectoryCandidate candidate = new Directories.DataDirectoryCandidate(dataDir);
+            // exclude directory if its total writeSize does not fit to data directory
+            if (candidate.availableSpace < writeSize)
+                continue;
+            candidates.add(candidate);
+            totalAvailable += candidate.availableSpace;
+        }
 
         Directories.sortWriteableCandidates(candidates, totalAvailable);
 
         return candidates;
     }
-
 }
diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
index fd15366..44cbbff 100644
--- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
@@ -415,6 +415,20 @@ public class KeyspaceTest extends CQLTester
     }
 
     @Test
+    public void testSnapshotCreation() throws Throwable {
+        createTable("CREATE TABLE %s (a text, b int, c int, PRIMARY KEY (a, b))");
+
+        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", "0", 0, 0);
+
+        Keyspace ks = Keyspace.open(KEYSPACE_PER_TEST);
+        String table = getCurrentColumnFamilyStore().name;
+        ks.snapshot("test", table);
+
+        assertTrue(ks.snapshotExists("test"));
+        assertEquals(1, ks.getAllSnapshots().count());
+    }
+
+    @Test
     public void testLimitSSTables() throws Throwable
     {
         createTable("CREATE TABLE %s (a text, b int, c int, PRIMARY KEY (a, b))");
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
index 09cab7f..a2f3abe 100644
--- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
@@ -168,7 +168,7 @@ public class SystemKeyspaceTest
         Set<String> snapshottedTableNames = new HashSet<>();
         for (ColumnFamilyStore cfs : Keyspace.open(keyspace).getColumnFamilyStores())
         {
-            if (!cfs.getSnapshotDetails().isEmpty())
+            if (!cfs.listSnapshots().isEmpty())
                 snapshottedTableNames.add(cfs.getTableName());
         }
         return snapshottedTableNames;
diff --git a/test/unit/org/apache/cassandra/service/snapshot/SnapshotManagerTest.java b/test/unit/org/apache/cassandra/service/snapshot/SnapshotManagerTest.java
new file mode 100644
index 0000000..d38f589
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/snapshot/SnapshotManagerTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.DefaultFSErrorHandler;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class SnapshotManagerTest
+{
+    static long ONE_DAY_SECS = 86400;
+
+    @BeforeClass
+    public static void beforeClass()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
+    }
+
+    @ClassRule
+    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    public Set<File> createFolders() throws IOException {
+        File folder = temporaryFolder.newFolder();
+        Set<File> folders = new HashSet<>();
+        for (String folderName : Arrays.asList("foo", "bar", "buzz")) {
+            File subfolder = new File(folder, folderName);
+            subfolder.mkdir();
+            assertThat(subfolder).exists();
+            folders.add(subfolder);
+        };
+
+        return folders;
+    }
+
+
+    private TableSnapshot generateSnapshotDetails(String tag, Instant expiration) throws Exception {
+        return new TableSnapshot(
+            "ks",
+            "tbl",
+            tag,
+            Instant.EPOCH,
+            expiration,
+            createFolders(),
+            (file) -> 0L
+        );
+    }
+
+    @Test
+    public void testLoadSnapshots() throws Exception {
+        TableSnapshot expired = generateSnapshotDetails("expired", Instant.EPOCH);
+        TableSnapshot nonExpired = generateSnapshotDetails("non-expired", Instant.now().plusSeconds(ONE_DAY_SECS));
+        TableSnapshot nonExpiring = generateSnapshotDetails("non-expiring", null);
+        List<TableSnapshot> snapshots = Arrays.asList(expired, nonExpired, nonExpiring);
+
+        // Create SnapshotManager with 3 snapshots: expired, non-expired and non-expiring
+        SnapshotManager manager = new SnapshotManager(3, 3, snapshots::stream);
+        manager.loadSnapshots();
+
+        // Only expiring snapshots should be loaded
+        assertThat(manager.getExpiringSnapshots()).hasSize(2);
+        assertThat(manager.getExpiringSnapshots()).contains(expired);
+        assertThat(manager.getExpiringSnapshots()).contains(nonExpired);
+    }
+
+    @Test
+    public void testClearExpiredSnapshots() throws Exception {
+        SnapshotManager manager = new SnapshotManager(3, 3, Stream::empty);
+
+        // Add 3 snapshots: expired, non-expired and non-expiring
+        TableSnapshot expired = generateSnapshotDetails("expired", Instant.EPOCH);
+        TableSnapshot nonExpired = generateSnapshotDetails("non-expired", Instant.now().plusMillis(ONE_DAY_SECS));
+        TableSnapshot nonExpiring = generateSnapshotDetails("non-expiring", null);
+        manager.addSnapshot(expired);
+        manager.addSnapshot(nonExpired);
+        manager.addSnapshot(nonExpiring);
+
+        // Only expiring snapshot should be indexed and all should exist
+        assertThat(manager.getExpiringSnapshots()).hasSize(2);
+        assertThat(manager.getExpiringSnapshots()).contains(expired);
+        assertThat(manager.getExpiringSnapshots()).contains(nonExpired);
+        assertThat(expired.exists()).isTrue();
+        assertThat(nonExpired.exists()).isTrue();
+        assertThat(nonExpiring.exists()).isTrue();
+
+        // After clearing expired snapshots, expired snapshot should be removed while the others should remain
+        manager.clearExpiredSnapshots();
+        assertThat(manager.getExpiringSnapshots()).hasSize(1);
+        assertThat(manager.getExpiringSnapshots()).contains(nonExpired);
+        assertThat(expired.exists()).isFalse();
+        assertThat(nonExpired.exists()).isTrue();
+        assertThat(nonExpiring.exists()).isTrue();
+    }
+
+    @Test
+    public void testScheduledCleanup() throws Exception {
+        SnapshotManager manager = new SnapshotManager(0, 1, Stream::empty);
+        try
+        {
+            // Start snapshot manager which should start expired snapshot cleanup thread
+            manager.start();
+
+            // Add 2 expiring snapshots: one to expire in 2 seconds, another in 1 day
+            int TTL_SECS = 2;
+            TableSnapshot toExpire = generateSnapshotDetails("to-expire", Instant.now().plusSeconds(TTL_SECS));
+            TableSnapshot nonExpired = generateSnapshotDetails("non-expired", Instant.now().plusMillis(ONE_DAY_SECS));
+            manager.addSnapshot(toExpire);
+            manager.addSnapshot(nonExpired);
+
+            // Check both snapshots still exist
+            assertThat(toExpire.exists()).isTrue();
+            assertThat(nonExpired.exists()).isTrue();
+            assertThat(manager.getExpiringSnapshots()).hasSize(2);
+            assertThat(manager.getExpiringSnapshots()).contains(toExpire);
+            assertThat(manager.getExpiringSnapshots()).contains(nonExpired);
+
+            // Sleep 4 seconds
+            Thread.sleep((TTL_SECS + 2) * 1000L);
+
+            // Snapshot with ttl=2s should be gone, while other should remain
+            assertThat(manager.getExpiringSnapshots()).hasSize(1);
+            assertThat(manager.getExpiringSnapshots()).contains(nonExpired);
+            assertThat(toExpire.exists()).isFalse();
+            assertThat(nonExpired.exists()).isTrue();
+        }
+        finally
+        {
+            manager.stop();
+        }
+    }
+
+    @Test
+    public void testClearSnapshot() throws Exception
+    {
+        // Given
+        SnapshotManager manager = new SnapshotManager(1, 3, Stream::empty);
+        TableSnapshot expiringSnapshot = generateSnapshotDetails("snapshot", Instant.now().plusMillis(50000));
+        manager.addSnapshot(expiringSnapshot);
+        assertThat(manager.getExpiringSnapshots()).contains(expiringSnapshot);
+        assertThat(expiringSnapshot.exists()).isTrue();
+
+        // When
+        manager.clearSnapshot(expiringSnapshot);
+
+        // Then
+        assertThat(manager.getExpiringSnapshots()).doesNotContain(expiringSnapshot);
+        assertThat(expiringSnapshot.exists()).isFalse();
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/snapshot/SnapshotManifestTest.java b/test/unit/org/apache/cassandra/service/snapshot/SnapshotManifestTest.java
new file mode 100644
index 0000000..f72d1ec
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/snapshot/SnapshotManifestTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.snapshot;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Arrays;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.assertj.core.api.Assertions.assertThatIOException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.cassandra.config.Duration;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class SnapshotManifestTest
+{
+    @Rule
+    public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Test
+    public void testDeserializeFromInvalidFile() throws IOException {
+        File manifestFile = tempFolder.newFile("invalid");
+        assertThatIOException().isThrownBy(
+            () -> {
+                SnapshotManifest.deserializeFromJsonFile(manifestFile);
+            });
+
+        FileOutputStream out = new FileOutputStream(manifestFile);
+        out.write(1);
+        out.write(2);
+        out.write(3);
+        out.close();
+        assertThatIOException().isThrownBy(
+            () -> SnapshotManifest.deserializeFromJsonFile(manifestFile));
+    }
+
+    @Test
+    public void testDeserializeManifest() throws IOException
+    {
+        Map<String, Object> map = new HashMap<>();
+        String createdAt = "2021-07-03T10:37:30Z";
+        String expiresAt = "2021-08-03T10:37:30Z";
+        map.put("created_at", createdAt);
+        map.put("expires_at", expiresAt);
+        map.put("files", Arrays.asList("db1", "db2", "db3"));
+
+        ObjectMapper mapper = new ObjectMapper();
+        File manifestFile = tempFolder.newFile("manifest.json");
+        mapper.writeValue(manifestFile, map);
+        SnapshotManifest manifest = SnapshotManifest.deserializeFromJsonFile(manifestFile);
+
+        assertThat(manifest.getExpiresAt()).isEqualTo(Instant.parse(expiresAt));
+        assertThat(manifest.getCreatedAt()).isEqualTo(Instant.parse(createdAt));
+        assertThat(manifest.getFiles()).contains("db1").contains("db2").contains("db3").hasSize(3);
+    }
+
+    @Test
+    public void testOptionalFields() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("files", Arrays.asList("db1", "db2", "db3"));
+        ObjectMapper mapper = new ObjectMapper();
+        File manifestFile = tempFolder.newFile("manifest.json");
+        mapper.writeValue(manifestFile, map);
+        SnapshotManifest manifest = SnapshotManifest.deserializeFromJsonFile(manifestFile);
+
+        assertThat(manifest.getExpiresAt()).isNull();
+        assertThat(manifest.getCreatedAt()).isNull();
+        assertThat(manifest.getFiles()).contains("db1").contains("db2").contains("db3").hasSize(3);
+    }
+
+    @Test
+    public void testIngoredFields() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("files", Arrays.asList("db1", "db2", "db3"));
+        map.put("dummy", "dummy");
+        ObjectMapper mapper = new ObjectMapper();
+        File manifestFile = tempFolder.newFile("manifest.json");
+        mapper.writeValue(manifestFile, map);
+        SnapshotManifest manifest = SnapshotManifest.deserializeFromJsonFile(manifestFile);
+        assertThat(manifest.getFiles()).contains("db1").contains("db2").contains("db3").hasSize(3);
+    }
+
+    @Test
+    public void testSerializeAndDeserialize() throws Exception {
+        SnapshotManifest manifest = new SnapshotManifest(Arrays.asList("db1", "db2", "db3"), new Duration("2m"));
+        File manifestFile = tempFolder.newFile("manifest.json");
+        manifest.serializeToJsonFile(manifestFile);
+        manifest = SnapshotManifest.deserializeFromJsonFile(manifestFile);
+        assertThat(manifest.getExpiresAt()).isNotNull();
+        assertThat(manifest.getCreatedAt()).isNotNull();
+        assertThat(manifest.getFiles()).contains("db1").contains("db2").contains("db3").hasSize(3);
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/snapshot/TableSnapshotTest.java b/test/unit/org/apache/cassandra/service/snapshot/TableSnapshotTest.java
new file mode 100644
index 0000000..460bc0b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/snapshot/TableSnapshotTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.snapshot;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TableSnapshotTest
+{
+    @Before
+    public void setup()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    @ClassRule
+    public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    public Set<File> createFolders() throws IOException
+    {
+        File folder = tempFolder.newFolder();
+        Set<File> folders = new HashSet<>();
+        for (String folderName : Arrays.asList("foo", "bar", "buzz"))
+        {
+            File subfolder = new File(folder, folderName);
+            subfolder.mkdir();
+            assertThat(subfolder).exists();
+            folders.add(subfolder);
+        }
+
+        return folders;
+    }
+
+    @Test
+    public void testSnapshotExists() throws IOException
+    {
+        Set<File> folders = createFolders();
+
+        TableSnapshot snapshot = new TableSnapshot(
+        "ks",
+        "tbl",
+        "some",
+        null,
+        null,
+        folders,
+        (File file) -> 0L
+        );
+
+        assertThat(snapshot.exists()).isTrue();
+
+        folders.forEach(FileUtils::deleteRecursive);
+
+        assertThat(snapshot.exists()).isFalse();
+    }
+
+    @Test
+    public void testSnapshotExpiring() throws IOException
+    {
+        Set<File> folders = createFolders();
+
+        TableSnapshot snapshot = new TableSnapshot(
+        "ks",
+        "tbl",
+        "some",
+        null,
+        null,
+        folders,
+        (File file) -> 0L
+        );
+
+        assertThat(snapshot.isExpiring()).isFalse();
+        assertThat(snapshot.isExpired(Instant.now())).isFalse();
+
+        snapshot = new TableSnapshot(
+        "ks",
+        "tbl",
+        "some",
+        Instant.now(),
+        null,
+        folders,
+        (File file) -> 0L
+        );
+
+        assertThat(snapshot.isExpiring()).isFalse();
+        assertThat(snapshot.isExpired(Instant.now())).isFalse();
+
+        snapshot = new TableSnapshot(
+        "ks",
+        "tbl",
+        "some",
+        Instant.now(),
+        Instant.now().plusSeconds(1000),
+        folders,
+        (File file) -> 0L
+        );
+
+        assertThat(snapshot.isExpiring()).isTrue();
+        assertThat(snapshot.isExpired(Instant.now())).isFalse();
+
+        snapshot = new TableSnapshot(
+        "ks",
+        "tbl",
+        "some",
+        Instant.now(),
+        Instant.now().minusSeconds(1000),
+        folders,
+        (File file) -> 0L
+        );
+
+        assertThat(snapshot.isExpiring()).isTrue();
+        assertThat(snapshot.isExpired(Instant.now())).isTrue();
+    }
+
+    private Long writeBatchToFile(File file) throws IOException
+    {
+        FileOutputStream out = new FileOutputStream(file);
+        out.write(1);
+        out.write(2);
+        out.write(3);
+        out.close();
+        return 3L;
+    }
+
+    @Test
+    public void testComputeSizeOnDisk() throws IOException
+    {
+        Set<File> folders = createFolders();
+
+        TableSnapshot tableDetails = new TableSnapshot(
+        "ks",
+        "tbl",
+        "some",
+        null,
+        null,
+        folders,
+        (File file) -> {
+            return 0L;
+        }
+        );
+
+        Long res = 0L;
+
+        for (File dir : folders)
+        {
+            writeBatchToFile(new File(dir, "tmp"));
+            res += FileUtils.folderSize(dir);
+        }
+
+        assertThat(tableDetails.computeSizeOnDiskBytes()).isGreaterThan(0L);
+        assertThat(tableDetails.computeSizeOnDiskBytes()).isEqualTo(res);
+    }
+
+    @Test
+    public void testComputeTrueSize() throws IOException
+    {
+        Set<File> folders = createFolders();
+
+        TableSnapshot tableDetails = new TableSnapshot(
+        "ks",
+        "tbl",
+        "some",
+        null,
+        null,
+        folders,
+        File::length
+        );
+
+        Long res = 0L;
+
+        for (File dir : folders)
+        {
+            writeBatchToFile(new File(dir, "tmp"));
+            res += dir.length();
+        }
+
+        assertThat(tableDetails.computeTrueSizeBytes()).isGreaterThan(0L);
+        assertThat(tableDetails.computeTrueSizeBytes()).isEqualTo(res);
+    }
+
+    @Test
+    public void testGetCreatedAt() throws IOException
+    {
+        Set<File> folders = createFolders();
+
+        // When createdAt is not null, getCreatedAt() should return it
+        Instant createdAt = Instant.EPOCH;
+        TableSnapshot withCreatedAt = new TableSnapshot(
+        "ks",
+        "tbl",
+        "some1",
+        createdAt,
+        null,
+        folders,
+        (File file) -> 0L
+        );
+        assertThat(withCreatedAt.getCreatedAt()).isEqualTo(createdAt);
+
+        // When createdAt is  null, it should return the snapshot folder minimum update time
+        TableSnapshot withoutCreatedAt = new TableSnapshot(
+        "ks",
+        "tbl",
+        "some1",
+        null,
+        null,
+        folders,
+        (File file) -> 0L
+        );
+        assertThat(withoutCreatedAt.getCreatedAt()).isEqualTo(Instant.ofEpochMilli(folders.stream().mapToLong(f -> f.lastModified()).min().getAsLong()));
+    }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org