You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/11 16:23:39 UTC

[4/7] cassandra git commit: Safer Resource Management++

Safer Resource Management++

patch by benedict; reviewed by marcus for CASSANDRA-8707


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/61384c57
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/61384c57
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/61384c57

Branch: refs/heads/trunk
Commit: 61384c57546da3d411630c64c4aa89d90cac98f7
Parents: 708b0ce
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Feb 11 14:56:23 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Feb 11 14:56:23 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   6 +-
 .../cassandra/db/compaction/CompactionTask.java |   4 -
 .../SizeTieredCompactionStrategy.java           |  10 +-
 .../io/compress/CompressionMetadata.java        |  19 +-
 .../cassandra/io/sstable/IndexSummary.java      |  47 +-
 .../io/sstable/IndexSummaryBuilder.java         |   3 +-
 .../io/sstable/IndexSummaryManager.java         |   6 +-
 .../io/sstable/SSTableDeletingTask.java         |  36 +-
 .../cassandra/io/sstable/SSTableLoader.java     |   4 +-
 .../cassandra/io/sstable/SSTableReader.java     | 590 ++++++++++++-------
 .../cassandra/io/sstable/SSTableRewriter.java   |  10 +-
 .../cassandra/io/sstable/SSTableWriter.java     |  45 +-
 .../io/util/BufferedPoolingSegmentedFile.java   |  12 +-
 .../io/util/BufferedSegmentedFile.java          |  23 +-
 .../io/util/CompressedPoolingSegmentedFile.java |  29 +-
 .../io/util/CompressedSegmentedFile.java        |  32 +-
 .../org/apache/cassandra/io/util/Memory.java    |   8 +-
 .../cassandra/io/util/MmappedSegmentedFile.java |  55 +-
 .../cassandra/io/util/PoolingSegmentedFile.java |  35 +-
 .../apache/cassandra/io/util/SegmentedFile.java |  40 +-
 .../cassandra/service/ActiveRepairService.java  |   4 +-
 .../cassandra/streaming/StreamSession.java      |   2 +-
 .../streaming/messages/OutgoingFileMessage.java |   2 +-
 .../cassandra/tools/StandaloneScrubber.java     |   2 +-
 .../cassandra/utils/AlwaysPresentFilter.java    |   5 +
 .../org/apache/cassandra/utils/BloomFilter.java |  20 +-
 .../org/apache/cassandra/utils/IFilter.java     |   7 +-
 .../cassandra/utils/Murmur3BloomFilter.java     |  14 +-
 .../apache/cassandra/utils/concurrent/Ref.java  | 208 ++++++-
 .../cassandra/utils/concurrent/RefCounted.java  |  52 +-
 .../utils/concurrent/RefCountedImpl.java        | 132 -----
 .../apache/cassandra/utils/concurrent/Refs.java |  26 +-
 .../cassandra/utils/obs/OffHeapBitSet.java      |   5 +
 .../cassandra/db/ColumnFamilyStoreTest.java     |   5 +-
 .../db/compaction/AntiCompactionTest.java       |   6 +-
 .../SizeTieredCompactionStrategyTest.java       |  30 +-
 .../io/sstable/IndexSummaryManagerTest.java     |  37 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   3 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |  11 +-
 .../io/sstable/SSTableRewriterTest.java         |  13 +-
 .../streaming/StreamTransferTaskTest.java       |   2 +-
 .../cassandra/tools/SSTableExportTest.java      |   1 +
 .../cassandra/tools/SSTableImportTest.java      |   7 +
 .../apache/cassandra/utils/BloomFilterTest.java |  22 +-
 .../cassandra/utils/SerializationsTest.java     |   5 +-
 .../utils/concurrent/RefCountedTest.java        |  12 +-
 47 files changed, 991 insertions(+), 657 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 248139f..b323f18 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Safer Resource Management++ (CASSANDRA-8707)
  * Write partition size estimates into a system table (CASSANDRA-7688)
  * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
    (CASSANDRA-8154)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 5ec06bc..acf9f92 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -367,7 +367,7 @@ public class DataTracker
         while (!view.compareAndSet(currentView, newView));
         for (SSTableReader sstable : currentView.sstables)
             if (!remaining.contains(sstable))
-                sstable.sharedRef().release();
+                sstable.selfRef().release();
         notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
     }
 
@@ -406,7 +406,7 @@ public class DataTracker
             sstable.setTrackedBy(this);
 
         for (SSTableReader sstable : oldSSTables)
-            sstable.sharedRef().release();
+            sstable.selfRef().release();
     }
 
     private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)
@@ -467,7 +467,7 @@ public class DataTracker
         {
             boolean firstToCompact = sstable.markObsolete();
             assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted";
-            sstable.sharedRef().release();
+            sstable.selfRef().release();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index b6c215e..4d9b463 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -18,10 +18,8 @@
 package org.apache.cassandra.db.compaction;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -31,7 +29,6 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
@@ -40,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.io.sstable.SSTableReader;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index fbd715c..8b1610e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -144,8 +144,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         // calculate the total reads/sec across all sstables
         double totalReads = 0.0;
         for (SSTableReader sstr : sstables)
-            if (sstr.readMeter != null)
-                totalReads += sstr.readMeter.twoHourRate();
+            if (sstr.getReadMeter() != null)
+                totalReads += sstr.getReadMeter().twoHourRate();
 
         // if this is a system table with no read meters or we don't have any read rates yet, just return them all
         if (totalReads == 0.0)
@@ -159,11 +159,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         while (cutoffIndex < sstables.size())
         {
             SSTableReader sstable = sstables.get(cutoffIndex);
-            if (sstable.readMeter == null)
+            if (sstable.getReadMeter() == null)
             {
                 throw new AssertionError("If you're seeing this exception, please attach your logs to CASSANDRA-8238 to help us debug. "+sstable);
             }
-            double reads = sstable.readMeter.twoHourRate();
+            double reads = sstable.getReadMeter().twoHourRate();
             if (totalColdReads + reads > maxColdReads)
                 break;
 
@@ -307,7 +307,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
     private static double hotness(SSTableReader sstr)
     {
         // system tables don't have read meters, just use 0.0 for the hotness
-        return sstr.readMeter == null ? 0.0 : sstr.readMeter.twoHourRate() / sstr.estimatedKeys();
+        return sstr.getReadMeter() == null ? 0.0 : sstr.getReadMeter().twoHourRate() / sstr.estimatedKeys();
     }
 
     public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index a40048a..aaf1656 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -325,18 +325,15 @@ public class CompressionMetadata
         public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType finishType)
         {
             RefCountedMemory offsets;
-            switch (finishType)
+            if (finishType.isFinal)
             {
-                case EARLY:
-                    offsets = this.offsets;
-                    break;
-                case NORMAL:
-                case FINISH_EARLY:
-                    offsets = this.offsets.copy(count * 8L);
-                    this.offsets.unreference();
-                    break;
-                default:
-                    throw new AssertionError();
+                // we now know how many offsets we have and can resize the offsets properly
+                offsets = this.offsets.copy(count * 8L);
+                this.offsets.unreference();
+            }
+            else
+            {
+                offsets = this.offsets;
             }
             return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index f53a7e4..0cde124 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -17,21 +17,19 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.MemoryOutputStream;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 
@@ -45,10 +43,8 @@ import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
  *     (This is necessary because keys can have different lengths.)
  *  2.  A sequence of (DecoratedKey, position) pairs, where position is the offset into the actual index file.
  */
-public class IndexSummary implements Closeable
+public class IndexSummary extends WrappedSharedCloseable
 {
-    private static final Logger logger = LoggerFactory.getLogger(IndexSummary.class);
-
     public static final IndexSummarySerializer serializer = new IndexSummarySerializer();
 
     /**
@@ -60,7 +56,7 @@ public class IndexSummary implements Closeable
     private final IPartitioner partitioner;
     private final int summarySize;
     private final int sizeAtFullSampling;
-    private final RefCountedMemory bytes;
+    private final Memory bytes;
 
     /**
      * A value between 1 and BASE_SAMPLING_LEVEL that represents how many of the original
@@ -70,17 +66,29 @@ public class IndexSummary implements Closeable
      */
     private final int samplingLevel;
 
-    public IndexSummary(IPartitioner partitioner, RefCountedMemory memory, int summarySize, int sizeAtFullSampling,
+    public IndexSummary(IPartitioner partitioner, Memory bytes, int summarySize, int sizeAtFullSampling,
                         int minIndexInterval, int samplingLevel)
     {
+        super(bytes);
         this.partitioner = partitioner;
         this.minIndexInterval = minIndexInterval;
         this.summarySize = summarySize;
         this.sizeAtFullSampling = sizeAtFullSampling;
-        this.bytes = memory;
+        this.bytes = bytes;
         this.samplingLevel = samplingLevel;
     }
 
+    private IndexSummary(IndexSummary copy)
+    {
+        super(copy);
+        this.partitioner = copy.partitioner;
+        this.minIndexInterval = copy.minIndexInterval;
+        this.summarySize = copy.summarySize;
+        this.sizeAtFullSampling = copy.sizeAtFullSampling;
+        this.bytes = copy.bytes;
+        this.samplingLevel = copy.samplingLevel;
+    }
+
     // binary search is notoriously more difficult to get right than it looks; this is lifted from
     // Harmony's Collections implementation
     public int binarySearch(RowPosition key)
@@ -137,7 +145,7 @@ public class IndexSummary implements Closeable
         long start = getPositionInSummary(index);
         long end = calculateEnd(index);
         byte[] entry = new byte[(int)(end - start)];
-        bytes.getBytes(start, entry, 0, (int)(end - start));
+        bytes.getBytes(start, entry, 0, (int) (end - start));
         return entry;
     }
 
@@ -206,6 +214,11 @@ public class IndexSummary implements Closeable
         return Downsampling.getEffectiveIndexIntervalAfterIndex(index, samplingLevel, minIndexInterval);
     }
 
+    public IndexSummary sharedCopy()
+    {
+        return new IndexSummary(this);
+    }
+
     public static class IndexSummarySerializer
     {
         public void serialize(IndexSummary t, DataOutputPlus out, boolean withSamplingLevel) throws IOException
@@ -256,16 +269,4 @@ public class IndexSummary implements Closeable
             return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, minIndexInterval, samplingLevel);
         }
     }
-
-    @Override
-    public void close()
-    {
-        bytes.unreference();
-    }
-
-    public IndexSummary readOnlyClone()
-    {
-        bytes.reference();
-        return this;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 8e9cc30..df326d7 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.Memory;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 import static org.apache.cassandra.io.sstable.SSTable.getMinimalKey;
@@ -148,7 +149,7 @@ public class IndexSummaryBuilder
 
         // first we write out the position in the *summary* for each key in the summary,
         // then we write out (key, actual index position) pairs
-        RefCountedMemory memory = new RefCountedMemory(offheapSize + (length * 4));
+        Memory memory = Memory.allocate(offheapSize + (length * 4));
         int idxPosition = 0;
         int keyPosition = length * 4;
         for (int i = 0; i < length; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 65b25a4..4144c32 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -266,9 +266,9 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
         double totalReadsPerSec = 0.0;
         for (SSTableReader sstable : nonCompacting)
         {
-            if (sstable.readMeter != null)
+            if (sstable.getReadMeter() != null)
             {
-                Double readRate = sstable.readMeter.fifteenMinuteRate();
+                Double readRate = sstable.getReadMeter().fifteenMinuteRate();
                 totalReadsPerSec += readRate;
                 readRates.put(sstable, readRate);
             }
@@ -314,7 +314,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
             int minIndexInterval = sstable.metadata.getMinIndexInterval();
             int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
 
-            double readsPerSec = sstable.readMeter == null ? 0.0 : sstable.readMeter.fifteenMinuteRate();
+            double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
             long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
 
             // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
index fb1cbb3..3da6906 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
@@ -40,31 +40,42 @@ public class SSTableDeletingTask implements Runnable
     // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
     // Additionally, we need to make sure to delete the data file first, so on restart the others
     // will be recognized as GCable.
-    private static final Set<SSTableDeletingTask> failedTasks = new CopyOnWriteArraySet<SSTableDeletingTask>();
+    private static final Set<SSTableDeletingTask> failedTasks = new CopyOnWriteArraySet<>();
 
     private final SSTableReader referent;
     private final Descriptor desc;
     private final Set<Component> components;
     private DataTracker tracker;
 
-    public SSTableDeletingTask(SSTableReader referent)
+    /**
+     * realDescriptor is the actual descriptor for the sstable, the descriptor inside
+     * referent can be 'faked' as FINAL for early opened files. We need the real one
+     * to be able to remove the files.
+     */
+    public SSTableDeletingTask(Descriptor realDescriptor, SSTableReader referent)
     {
         this.referent = referent;
-        if (referent.openReason == SSTableReader.OpenReason.EARLY)
+        this.desc = realDescriptor;
+        switch (desc.type)
         {
-            this.desc = referent.descriptor.asType(Descriptor.Type.TEMPLINK);
-            this.components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
-        }
-        else
-        {
-            this.desc = referent.descriptor;
-            this.components = referent.components;
+            case FINAL:
+                this.components = referent.components;
+                break;
+            case TEMPLINK:
+                this.components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
+                break;
+            default:
+                throw new IllegalStateException();
         }
     }
 
     public void setTracker(DataTracker tracker)
     {
-        this.tracker = tracker;
+        // the tracker is used only to notify listeners of deletion of the sstable;
+        // since deletion of a non-final file is not really deletion of the sstable,
+        // we don't want to notify the listeners in this event
+        if (desc.type == Descriptor.Type.FINAL)
+            this.tracker = tracker;
     }
 
     public void schedule()
@@ -79,9 +90,6 @@ public class SSTableDeletingTask implements Runnable
         if (tracker != null)
             tracker.notifyDeleting(referent);
 
-        if (referent.readMeter != null)
-            SystemKeyspace.clearSSTableReadMeter(referent.getKeyspaceName(), referent.getColumnFamilyName(), referent.descriptor.generation);
-
         // If we can't successfully delete the DATA component, set the task to be retried later: see above
         File datafile = new File(desc.filenameFor(Component.DATA));
         if (!datafile.delete())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 06f71d8..cd23ae2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -209,8 +209,8 @@ public class SSTableLoader implements StreamEventHandler
     {
         for (SSTableReader sstable : sstables)
         {
-            sstable.sharedRef().release();
-            assert sstable.sharedRef().globalCount() == 0;
+            sstable.selfRef().release();
+            assert sstable.selfRef().globalCount() == 0;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index f34939a..a28eb44 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -36,11 +36,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -118,10 +114,62 @@ import org.apache.cassandra.utils.concurrent.RefCounted;
 import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 
 /**
- * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
- * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
+ * An SSTableReader can be constructed in a number of places, but typically is either
+ * read from disk at startup, or constructed from a flushed memtable, or after compaction
+ * to replace some existing sstables. However once created, an sstablereader may also be modified.
+ *
+ * A reader's OpenReason describes its current stage in its lifecycle, as follows:
+ *
+ * NORMAL
+ * From:       None        => Reader has been read from disk, either at startup or from a flushed memtable
+ *             EARLY       => Reader is the final result of a compaction
+ *             MOVED_START => Reader WAS being compacted, but this failed and it has been restored to NORMAL status
+ *
+ * EARLY
+ * From:       None        => Reader is a compaction replacement that is either incomplete and has been opened
+ *                            to represent its partial result status, or has been finished but the compaction
+ *                            it is a part of has not yet completed fully
+ *             EARLY       => Same as from None, only it is not the first time it has been
+ *
+ * MOVED_START
+ * From:       NORMAL      => Reader is being compacted. This compaction has not finished, but the compaction result
+ *                            is either partially or fully opened, to either partially or fully replace this reader.
+ *                            This reader's start key has been updated to represent this, so that reads only hit
+ *                            one or the other reader.
+ *
+ * METADATA_CHANGE
+ * From:       NORMAL      => Reader has seen low traffic and the amount of memory available for index summaries is
+ *                            constrained, so its index summary has been downsampled.
+ *         METADATA_CHANGE => Same
+ *
+ * Note that in parallel to this, there are two different Descriptor types; TMPLINK and FINAL; the latter corresponds
+ * to NORMAL state readers and all readers that replace a NORMAL one. TMPLINK is used for EARLY state readers and
+ * no others.
+ *
+ * When a reader is being compacted, if the result is large its replacement may be opened as EARLY before compaction
+ * completes in order to present the result to consumers earlier. In this case the reader will itself be changed to
+ * a MOVED_START state, where its start no longer represents its on-disk minimum key. This is to permit reads to be
+ * directed to only one reader when the two represent the same data. The EARLY file can represent a compaction result
+ * that is either partially complete and still in-progress, or a complete and immutable sstable that is part of a larger
+ * macro compaction action that has not yet fully completed.
+ *
+ * Currently ALL compaction results at least briefly go through an EARLY open state prior to completion, regardless
+ * of if early opening is enabled.
+ *
+ * Since a reader can be created multiple times over the same shared underlying resources, and the exact resources
+ * it shares between each instance differ subtly, we track the lifetime of any underlying resource with its own
+ * reference count, which each instance takes a Ref to. Each instance then tracks references to itself, and once these
+ * all expire it releases its Refs to these underlying resources.
+ *
+ * There is some shared cleanup behaviour needed only once all sstablereaders in a certain stage of their lifecycle
+ * (i.e. EARLY or NORMAL opening), and some that must only occur once all readers of any kind over a single logical
+ * sstable have expired. These are managed by the TypeTidy and GlobalTidy classes at the bottom, and are effectively
+ * managed as another resource each instance tracks its own Ref instance to, to ensure all of these resources are
+ * cleaned up safely and can be debugged otherwise.
+ *
+ * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies
  */
-public class SSTableReader extends SSTable implements RefCounted
+public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 
@@ -166,7 +214,8 @@ public class SSTableReader extends SSTable implements RefCounted
     {
         NORMAL,
         EARLY,
-        METADATA_CHANGE
+        METADATA_CHANGE,
+        MOVED_START
     }
 
     public final OpenReason openReason;
@@ -174,7 +223,6 @@ public class SSTableReader extends SSTable implements RefCounted
     // indexfile and datafile: might be null before a call to load()
     private SegmentedFile ifile;
     private SegmentedFile dfile;
-
     private IndexSummary indexSummary;
     private IFilter bf;
 
@@ -184,8 +232,7 @@ public class SSTableReader extends SSTable implements RefCounted
 
     // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
     // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
-    private final AtomicBoolean isCompacted = new AtomicBoolean(false);
-    private final AtomicBoolean isSuspect = new AtomicBoolean(false);
+    private AtomicBoolean isSuspect = new AtomicBoolean(false);
 
     // not final since we need to be able to change level on a file.
     private volatile StatsMetadata sstableMetadata;
@@ -193,12 +240,10 @@ public class SSTableReader extends SSTable implements RefCounted
     private final AtomicLong keyCacheHit = new AtomicLong(0);
     private final AtomicLong keyCacheRequest = new AtomicLong(0);
 
-    private final Tidier tidy = new Tidier();
-    private final RefCounted refCounted = RefCounted.Impl.get(tidy);
+    private final InstanceTidier tidy = new InstanceTidier(descriptor, metadata);
+    private final Ref<SSTableReader> selfRef = new Ref<>(this, tidy);
 
-    @VisibleForTesting
-    public RestorableMeter readMeter;
-    private ScheduledFuture readMeterSyncFuture;
+    private RestorableMeter readMeter;
 
     /**
      * Calculate approximate key count.
@@ -399,7 +444,7 @@ public class SSTableReader extends SSTable implements RefCounted
         sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
         sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
         sstable.bf = FilterFactory.AlwaysPresent;
-        sstable.tidy.setup(sstable);
+        sstable.setup();
         return sstable;
     }
 
@@ -443,13 +488,13 @@ public class SSTableReader extends SSTable implements RefCounted
         sstable.load(validationMetadata);
         logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 
+        sstable.setup();
         if (validate)
             sstable.validate();
 
         if (sstable.getKeyCache() != null)
             logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 
-        sstable.tidy.setup(sstable);
         return sstable;
     }
 
@@ -545,32 +590,6 @@ public class SSTableReader extends SSTable implements RefCounted
         this.sstableMetadata = sstableMetadata;
         this.maxDataAge = maxDataAge;
         this.openReason = openReason;
-
-        tidy.deletingTask = new SSTableDeletingTask(this);
-
-        // Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
-        // the read meter when in client mode.  Also don't track reads for special operations (like early open)
-        // this is to avoid overflowing the executor queue (see CASSANDRA-8066)
-        if (Keyspace.SYSTEM_KS.equals(desc.ksname) || Config.isClientMode() || openReason != OpenReason.NORMAL)
-        {
-            readMeter = null;
-            readMeterSyncFuture = null;
-            return;
-        }
-
-        readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
-        // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
-        readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
-        {
-            public void run()
-            {
-                if (!isCompacted.get())
-                {
-                    meterSyncThrottle.acquire();
-                    SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
-                }
-            }
-        }, 1, 5, TimeUnit.MINUTES);
     }
 
     private SSTableReader(Descriptor desc,
@@ -586,12 +605,11 @@ public class SSTableReader extends SSTable implements RefCounted
                           OpenReason openReason)
     {
         this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
-
         this.ifile = ifile;
         this.dfile = dfile;
         this.indexSummary = indexSummary;
         this.bf = bloomFilter;
-        tidy.setup(this);
+        this.setup();
     }
 
     public static long getTotalBytes(Iterable<SSTableReader> sstables)
@@ -626,7 +644,7 @@ public class SSTableReader extends SSTable implements RefCounted
 
     public void setTrackedBy(DataTracker tracker)
     {
-        tidy.deletingTask.setTracker(tracker);
+        tidy.type.deletingTask.setTracker(tracker);
         // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
         // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
         // here when we know we're being wired into the rest of the server infrastructure.
@@ -698,7 +716,6 @@ public class SSTableReader extends SSTable implements RefCounted
         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
         if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
             saveSummary(ibuilder, dbuilder);
-        tidy.setup(this);
     }
 
     /**
@@ -792,6 +809,8 @@ public class SSTableReader extends SSTable implements RefCounted
         }
         catch (IOException e)
         {
+            if (indexSummary != null)
+                indexSummary.close();
             logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
             // corrupted; delete it and fall back to creating a new summary
             FileUtils.closeQuietly(iStream);
@@ -850,20 +869,21 @@ public class SSTableReader extends SSTable implements RefCounted
 
     public void setReplacedBy(SSTableReader replacement)
     {
-        synchronized (tidy.replaceLock)
+        synchronized (tidy.global)
         {
-            assert tidy.replacedBy == null;
-            tidy.replacedBy = replacement;
-            replacement.tidy.replaces = this;
-            replacement.tidy.replaceLock = tidy.replaceLock;
+            assert replacement != null;
+            assert !tidy.isReplaced;
+            assert tidy.global.live == this;
+            tidy.isReplaced = true;
+            tidy.global.live = replacement;
         }
     }
 
     public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
     {
-        synchronized (tidy.replaceLock)
+        synchronized (tidy.global)
         {
-            assert tidy.replacedBy == null;
+            assert openReason != OpenReason.EARLY;
 
             if (newStart.compareTo(this.first) > 0)
             {
@@ -895,10 +915,9 @@ public class SSTableReader extends SSTable implements RefCounted
                 }
             }
 
-            SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata,
-                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
-            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
-            replacement.readMeter = this.readMeter;
+            SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
+                                                          dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
+                                                          maxDataAge, sstableMetadata, OpenReason.MOVED_START);
             replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
             replacement.last = this.last;
             setReplacedBy(replacement);
@@ -916,9 +935,9 @@ public class SSTableReader extends SSTable implements RefCounted
      */
     public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
     {
-        synchronized (tidy.replaceLock)
+        synchronized (tidy.global)
         {
-            assert tidy.replacedBy == null;
+            assert openReason != OpenReason.EARLY;
 
             int minIndexInterval = metadata.getMinIndexInterval();
             int maxIndexInterval = metadata.getMaxIndexInterval();
@@ -957,10 +976,9 @@ public class SSTableReader extends SSTable implements RefCounted
             StorageMetrics.load.inc(newSize - oldSize);
             parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 
-            SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata,
-                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
-            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
-            replacement.readMeter = this.readMeter;
+            SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
+                                                          dfile.sharedCopy(), newSummary, bf.sharedCopy(), maxDataAge,
+                                                          sstableMetadata, OpenReason.METADATA_CHANGE);
             replacement.first = this.first;
             replacement.last = this.last;
             setReplacedBy(replacement);
@@ -992,6 +1010,11 @@ public class SSTableReader extends SSTable implements RefCounted
         }
     }
 
+    public RestorableMeter getReadMeter()
+    {
+        return readMeter;
+    }
+
     public int getIndexSummarySamplingLevel()
     {
         return indexSummary.getSamplingLevel();
@@ -1014,14 +1037,17 @@ public class SSTableReader extends SSTable implements RefCounted
 
     public void releaseSummary() throws IOException
     {
-        indexSummary.close();
+        tidy.releaseSummary();
         indexSummary = null;
     }
 
     private void validate()
     {
         if (this.first.compareTo(this.last) > 0)
+        {
+            selfRef().release();
             throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
+        }
     }
 
     /**
@@ -1567,16 +1593,16 @@ public class SSTableReader extends SSTable implements RefCounted
         if (logger.isDebugEnabled())
             logger.debug("Marking {} compacted", getFilename());
 
-        synchronized (tidy.replaceLock)
+        synchronized (tidy.global)
         {
-            assert tidy.replacedBy == null : getFilename();
+            assert !tidy.isReplaced;
         }
-        return !isCompacted.getAndSet(true);
+        return !tidy.global.isCompacted.getAndSet(true);
     }
 
     public boolean isMarkedCompacted()
     {
-        return isCompacted.get();
+        return tidy.global.isCompacted.get();
     }
 
     public void markSuspect()
@@ -1673,16 +1699,7 @@ public class SSTableReader extends SSTable implements RefCounted
 
     public SSTableReader getCurrentReplacement()
     {
-        synchronized (tidy.replaceLock)
-        {
-            SSTableReader cur = this, next = tidy.replacedBy;
-            while (next != null)
-            {
-                cur = next;
-                next = next.tidy.replacedBy;
-            }
-            return cur;
-        }
+        return tidy.global.live;
     }
 
     /**
@@ -1882,199 +1899,314 @@ public class SSTableReader extends SSTable implements RefCounted
         }
     }
 
-    public Ref tryRef()
+    public Ref<SSTableReader> tryRef()
     {
-        return refCounted.tryRef();
+        return selfRef.tryRef();
     }
 
-    public Ref sharedRef()
+    public Ref<SSTableReader> selfRef()
     {
-        return refCounted.sharedRef();
+        return selfRef;
     }
 
-    private static final class Tidier implements Tidy
+    public Ref<SSTableReader> ref()
     {
-        private String name;
-        private CFMetaData metadata;
-        // indexfile and datafile: might be null before a call to load()
-        private SegmentedFile ifile;
-        private SegmentedFile dfile;
+        return selfRef.ref();
+    }
 
-        private IndexSummary indexSummary;
-        private IFilter bf;
+    void setup()
+    {
+        tidy.setup(this);
+        this.readMeter = tidy.global.readMeter;
+    }
+
+    @VisibleForTesting
+    public void overrideReadMeter(RestorableMeter readMeter)
+    {
+        this.readMeter = tidy.global.readMeter = readMeter;
+    }
 
-        private AtomicBoolean isCompacted;
+    /**
+     * One instance per SSTableReader we create. This references the type-shared tidy, which in turn references
+     * the globally shared tidy, i.e.
+     *
+     * InstanceTidier => DescriptorTypeTitdy => GlobalTidy
+     *
+     * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example), but there can only be
+     * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy for one single logical sstable.
+     *
+     * When the InstanceTidier cleansup, it releases its reference to its DescriptorTypeTidy; when all InstanceTidiers
+     * for that type have run, the DescriptorTypeTidy cleansup. DescriptorTypeTidy behaves in the same way towards GlobalTidy.
+     *
+     * For ease, we stash a direct reference to both our type-shared and global tidier
+     */
+    private static final class InstanceTidier implements Tidy
+    {
+        private final Descriptor descriptor;
+        private final CFMetaData metadata;
+        private IFilter bf;
+        private IndexSummary summary;
 
-        /**
-         * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
-         * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
-         * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
-         * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
-         */
-        private Object replaceLock = new Object();
-        private SSTableReader replacedBy;
-        private SSTableReader replaces;
-        private SSTableDeletingTask deletingTask;
+        private SegmentedFile dfile;
+        private SegmentedFile ifile;
         private Runnable runOnClose;
+        private boolean isReplaced = false;
+
+        // a reference to our shared per-Descriptor.Type tidy instance, that
+        // we will release when we are ourselves released
+        private Ref<DescriptorTypeTidy> typeRef;
+
+        // a convenience stashing of the shared per-descriptor-type tidy instance itself
+        // and the per-logical-sstable globally shared state that it is linked to
+        private DescriptorTypeTidy type;
+        private GlobalTidy global;
+
+        private boolean setup;
+
+        void setup(SSTableReader reader)
+        {
+            this.setup = true;
+            this.bf = reader.bf;
+            this.summary = reader.indexSummary;
+            this.dfile = reader.dfile;
+            this.ifile = reader.ifile;
+            // get a new reference to the shared descriptor-type tidy
+            this.typeRef = DescriptorTypeTidy.get(reader);
+            this.type = typeRef.get();
+            this.global = type.globalRef.get();
+        }
 
-        @VisibleForTesting
-        public RestorableMeter readMeter;
-        private volatile ScheduledFuture readMeterSyncFuture;
+        InstanceTidier(Descriptor descriptor, CFMetaData metadata)
+        {
+            this.descriptor = descriptor;
+            this.metadata = metadata;
+        }
 
-        private void setup(SSTableReader reader)
+        public void tidy()
         {
-            name = reader.toString();
-            metadata = reader.metadata;
-            ifile = reader.ifile;
-            dfile = reader.dfile;
-            indexSummary = reader.indexSummary;
-            bf = reader.bf;
-            isCompacted = reader.isCompacted;
-            readMeterSyncFuture = reader.readMeterSyncFuture;
+            // don't try to cleanup if the sstablereader was never fully constructed
+            if (!setup)
+                return;
+
+            final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+            final OpOrder.Barrier barrier;
+            if (cfs != null)
+            {
+                barrier = cfs.readOrdering.newBarrier();
+                barrier.issue();
+            }
+            else
+                barrier = null;
+
+            ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
+            {
+                public void run()
+                {
+                    if (barrier != null)
+                        barrier.await();
+                    bf.close();
+                    dfile.close();
+                    ifile.close();
+                    if (summary != null)
+                        summary.close();
+                    if (runOnClose != null)
+                        runOnClose.run();
+                    typeRef.release();
+                }
+            });
         }
 
         public String name()
         {
-            return name;
+            return descriptor.toString();
         }
 
-        private void dropPageCache()
+        void releaseSummary()
         {
-            dropPageCache(dfile.path);
-            dropPageCache(ifile.path);
+            summary.close();
+            assert summary.isCleanedUp();
+            summary = null;
         }
+    }
+
+    /**
+     * One shared between all instances of a given Descriptor.Type.
+     * Performs only two things: the deletion of the sstables for the type,
+     * if necessary; and the shared reference to the globally shared state.
+     *
+     * All InstanceTidiers, on setup(), ask the static get() method for their shared state,
+     * and stash a reference to it to be released when they are. Once all such references are
+     * released, the shared tidy will be performed.
+     */
+    static final class DescriptorTypeTidy implements Tidy
+    {
+        // keyed by REAL descriptor (TMPLINK/FINAL), mapping to the shared DescriptorTypeTidy for that descriptor
+        static final ConcurrentMap<Descriptor, Ref<DescriptorTypeTidy>> lookup = new ConcurrentHashMap<>();
 
-        private void dropPageCache(String filePath)
+        private final Descriptor desc;
+        private final Ref<GlobalTidy> globalRef;
+        private final SSTableDeletingTask deletingTask;
+
+        DescriptorTypeTidy(Descriptor desc, SSTableReader sstable)
         {
-            RandomAccessFile file = null;
+            this.desc = desc;
+            this.deletingTask = new SSTableDeletingTask(desc, sstable);
+            // get a new reference to the shared global tidy
+            this.globalRef = GlobalTidy.get(sstable);
+        }
 
-            try
+        public void tidy()
+        {
+            lookup.remove(desc);
+            boolean isCompacted = globalRef.get().isCompacted.get();
+            globalRef.release();
+            switch (desc.type)
             {
-                file = new RandomAccessFile(filePath, "r");
+                case FINAL:
+                    if (isCompacted)
+                        deletingTask.run();
+                    break;
+                case TEMPLINK:
+                    deletingTask.run();
+                    break;
+                default:
+                    throw new IllegalStateException();
+            }
+        }
 
-                int fd = CLibrary.getfd(file.getFD());
+        public String name()
+        {
+            return desc.toString();
+        }
 
-                if (fd > 0)
-                {
-                    if (logger.isDebugEnabled())
-                        logger.debug(String.format("Dropping page cache of file %s.", filePath));
+        // get a new reference to the shared DescriptorTypeTidy for this sstable
+        public static Ref<DescriptorTypeTidy> get(SSTableReader sstable)
+        {
+            Descriptor desc = sstable.descriptor;
+            if (sstable.openReason == OpenReason.EARLY)
+                desc = desc.asType(Descriptor.Type.TEMPLINK);
+            Ref<DescriptorTypeTidy> refc = lookup.get(desc);
+            if (refc != null)
+                return refc.ref();
+            final DescriptorTypeTidy tidy = new DescriptorTypeTidy(desc, sstable);
+            refc = new Ref<>(tidy, tidy);
+            Ref<?> ex = lookup.putIfAbsent(desc, refc);
+            assert ex == null;
+            return refc;
+        }
+    }
 
-                    CLibrary.trySkipCache(fd, 0, 0);
-                }
-            }
-            catch (IOException e)
+    /**
+     * One instance per logical sstable. This both tracks shared cleanup and some shared state related
+     * to the sstable's lifecycle. All DescriptorTypeTidy instances, on construction, obtain a reference to us
+     * via our static get(). There should only ever be at most two such references extant at any one time,
+     * since only TMPLINK and FINAL type descriptors should be open as readers. When all files of both
+     * kinds have been released, this shared tidy will be performed.
+     */
+    static final class GlobalTidy implements Tidy
+    {
+        // keyed by FINAL descriptor, mapping to the shared GlobalTidy for that descriptor
+        static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>();
+
+        private final Descriptor desc;
+        // a single convenience property for getting the most recent version of an sstable, not related to tidying
+        private SSTableReader live;
+        // the readMeter that is shared between all instances of the sstable, and can be overridden in all of them
+        // at once also, for testing purposes
+        private RestorableMeter readMeter;
+        // the scheduled persistence of the readMeter, that we will cancel once all instances of this logical
+        // sstable have been released
+        private final ScheduledFuture readMeterSyncFuture;
+        // shared state managing if the logical sstable has been compacted; this is used in cleanup both here
+        // and in the FINAL type tidier
+        private final AtomicBoolean isCompacted;
+
+        GlobalTidy(final SSTableReader reader)
+        {
+            this.desc = reader.descriptor;
+            this.isCompacted = new AtomicBoolean();
+            this.live = reader;
+            // Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
+            // the read meter when in client mode.
+            if (Keyspace.SYSTEM_KS.equals(desc.ksname) || Config.isClientMode())
             {
-                // we don't care if cache cleanup fails
+                readMeter = null;
+                readMeterSyncFuture = null;
+                return;
             }
-            finally
+
+            readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+            // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
+            readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
             {
-                FileUtils.closeQuietly(file);
-            }
+                public void run()
+                {
+                    if (!isCompacted.get())
+                    {
+                        meterSyncThrottle.acquire();
+                        SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
+                    }
+                }
+            }, 1, 5, TimeUnit.MINUTES);
         }
 
         public void tidy()
         {
+            lookup.remove(desc);
             if (readMeterSyncFuture != null)
-                readMeterSyncFuture.cancel(false);
+                readMeterSyncFuture.cancel(true);
+            if (isCompacted.get())
+                SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+            // don't ideally want to dropPageCache for the file until all instances have been released
+            dropPageCache(desc.filenameFor(Component.DATA));
+            dropPageCache(desc.filenameFor(Component.PRIMARY_INDEX));
+        }
 
-            synchronized (replaceLock)
-            {
-                boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
+        public String name()
+        {
+            return desc.toString();
+        }
 
-                if (replacedBy != null)
-                {
-                    closeBf = replacedBy.bf != bf;
-                    closeSummary = replacedBy.indexSummary != indexSummary;
-                    closeFiles = replacedBy.dfile != dfile;
-                    // if the replacement sstablereader uses a different path, clean up our paths
-                    deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
-                }
+        // get a new reference to the shared GlobalTidy for this sstable
+        public static Ref<GlobalTidy> get(SSTableReader sstable)
+        {
+            Descriptor descriptor = sstable.descriptor;
+            Ref<GlobalTidy> refc = lookup.get(descriptor);
+            if (refc != null)
+                return refc.ref();
+            final GlobalTidy tidy = new GlobalTidy(sstable);
+            refc = new Ref<>(tidy, tidy);
+            Ref<?> ex = lookup.putIfAbsent(descriptor, refc);
+            assert ex == null;
+            return refc;
+        }
+    }
 
-                if (replaces != null)
-                {
-                    closeBf &= replaces.bf != bf;
-                    closeSummary &= replaces.indexSummary != indexSummary;
-                    closeFiles &= replaces.dfile != dfile;
-                    deleteFiles &= !dfile.path.equals(replaces.dfile.path);
-                }
+    private static void dropPageCache(String filePath)
+    {
+        RandomAccessFile file = null;
 
-                boolean deleteAll = false;
-                if (isCompacted.get())
-                {
-                    assert replacedBy == null;
-                    if (replaces != null && !deleteFiles)
-                    {
-                        replaces.tidy.replacedBy = null;
-                        replaces.tidy.deletingTask = deletingTask;
-                        replaces.markObsolete();
-                    }
-                    else
-                    {
-                        deleteAll = true;
-                    }
-                }
-                else
-                {
-                    closeSummary &= indexSummary != null;
-                    if (replaces != null)
-                        replaces.tidy.replacedBy = replacedBy;
-                    if (replacedBy != null)
-                        replacedBy.tidy.replaces = replaces;
-                }
+        try
+        {
+            file = new RandomAccessFile(filePath, "r");
 
-                scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
-            }
-        }
+            int fd = CLibrary.getfd(file.getFD());
 
-        private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
-        {
-            final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
-            final OpOrder.Barrier barrier;
-            if (cfs != null)
+            if (fd > 0)
             {
-                barrier = cfs.readOrdering.newBarrier();
-                barrier.issue();
-            }
-            else
-                barrier = null;
+                if (logger.isDebugEnabled())
+                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
 
-            ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
-            {
-                public void run()
-                {
-                    if (barrier != null)
-                        barrier.await();
-                    if (closeBf)
-                        bf.close();
-                    if (closeSummary)
-                        indexSummary.close();
-                    if (closeFiles)
-                    {
-                        ifile.cleanup();
-                        dfile.cleanup();
-                    }
-                    if (runOnClose != null)
-                        runOnClose.run();
-                    if (deleteAll)
-                    {
-                        /**
-                         * Do the OS a favour and suggest (using fadvice call) that we
-                         * don't want to see pages of this SSTable in memory anymore.
-                         *
-                         * NOTE: We can't use madvice in java because it requires the address of
-                         * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
-                         */
-                        dropPageCache();
-                        deletingTask.run();
-                    }
-                    else if (deleteFiles)
-                    {
-                        FileUtils.deleteWithConfirm(new File(dfile.path));
-                        FileUtils.deleteWithConfirm(new File(ifile.path));
-                    }
-                }
-            });
+                CLibrary.trySkipCache(fd, 0, 0);
+            }
+        }
+        catch (IOException e)
+        {
+            // we don't care if cache cleanup fails
+        }
+        finally
+        {
+            FileUtils.closeQuietly(file);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 7784b18..6356d4d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -183,14 +183,14 @@ public class SSTableRewriter
         for (SSTableReader sstable : finished)
         {
             sstable.markObsolete();
-            sstable.sharedRef().release();
+            sstable.selfRef().release();
         }
 
         // abort the writers
         for (Finished finished : finishedEarly)
         {
             boolean opened = finished.reader != null;
-            finished.writer.abort(!opened);
+            finished.writer.abort();
             if (opened)
             {
                 // if we've already been opened, add ourselves to the discard pile
@@ -361,7 +361,7 @@ public class SSTableRewriter
             }
             else
             {
-                f.writer.abort(true);
+                f.writer.abort();
                 assert f.reader == null;
             }
         }
@@ -380,9 +380,9 @@ public class SSTableRewriter
         {
             for (SSTableReader reader : discard)
             {
-                if (reader.getCurrentReplacement() == null)
+                if (reader.getCurrentReplacement() == reader)
                     reader.markObsolete();
-                reader.sharedRef().release();
+                reader.selfRef().release();
             }
         }
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index cc60594..d430314 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.Closeable;
 import java.io.DataInput;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -339,16 +338,12 @@ public class SSTableWriter extends SSTable
      */
     public void abort()
     {
-        abort(true);
-    }
-    public void abort(boolean closeBf)
-    {
         assert descriptor.type.isTemporary;
         if (iwriter == null && dataFile == null)
             return;
 
         if (iwriter != null)
-            iwriter.abort(closeBf);
+            iwriter.abort();
 
         if (dataFile!= null)
             dataFile.abort();
@@ -407,7 +402,7 @@ public class SSTableWriter extends SSTable
                                                            components, metadata,
                                                            partitioner, ifile,
                                                            dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
-                                                           iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
+                                                           iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
 
         // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
         sstable.first = getMinimalKey(first);
@@ -416,7 +411,7 @@ public class SSTableWriter extends SSTable
         if (inclusiveUpperBoundOfReadableData == null)
         {
             // Prevent leaving tmplink files on disk
-            sstable.sharedRef().release();
+            sstable.selfRef().release();
             return null;
         }
         int offset = 2;
@@ -428,7 +423,7 @@ public class SSTableWriter extends SSTable
             inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
             if (inclusiveUpperBoundOfReadableData == null)
             {
-                sstable.sharedRef().release();
+                sstable.selfRef().release();
                 return null;
             }
         }
@@ -438,14 +433,17 @@ public class SSTableWriter extends SSTable
 
     public static enum FinishType
     {
-        NORMAL(SSTableReader.OpenReason.NORMAL),
-        EARLY(SSTableReader.OpenReason.EARLY), // no renaming
-        FINISH_EARLY(SSTableReader.OpenReason.NORMAL); // tidy up an EARLY finish
+        CLOSE(null, true),
+        NORMAL(SSTableReader.OpenReason.NORMAL, true),
+        EARLY(SSTableReader.OpenReason.EARLY, false), // no renaming
+        FINISH_EARLY(SSTableReader.OpenReason.NORMAL, true); // tidy up an EARLY finish
         final SSTableReader.OpenReason openReason;
 
-        FinishType(SSTableReader.OpenReason openReason)
+        public final boolean isFinal;
+        FinishType(SSTableReader.OpenReason openReason, boolean isFinal)
         {
             this.openReason = openReason;
+            this.isFinal = isFinal;
         }
     }
 
@@ -461,6 +459,7 @@ public class SSTableWriter extends SSTable
 
     public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
     {
+        assert finishType != FinishType.CLOSE;
         Pair<Descriptor, StatsMetadata> p;
 
         p = close(finishType, repairedAt < 0 ? this.repairedAt : repairedAt);
@@ -480,16 +479,16 @@ public class SSTableWriter extends SSTable
                                                            ifile,
                                                            dfile,
                                                            iwriter.summary.build(partitioner),
-                                                           iwriter.bf,
+                                                           iwriter.bf.sharedCopy(),
                                                            maxDataAge,
                                                            metadata,
                                                            finishType.openReason);
         sstable.first = getMinimalKey(first);
         sstable.last = getMinimalKey(last);
 
-        switch (finishType)
+        if (finishType.isFinal)
         {
-            case NORMAL: case FINISH_EARLY:
+            iwriter.bf.close();
             // try to save the summaries to disk
             sstable.saveSummary(iwriter.builder, dbuilder);
             iwriter = null;
@@ -501,16 +500,18 @@ public class SSTableWriter extends SSTable
     // Close the writer and return the descriptor to the new sstable and it's metadata
     public Pair<Descriptor, StatsMetadata> close()
     {
-        return close(FinishType.NORMAL, this.repairedAt);
+        return close(FinishType.CLOSE, this.repairedAt);
     }
 
     private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
     {
         switch (type)
         {
-            case EARLY: case NORMAL:
+            case EARLY: case CLOSE: case NORMAL:
             iwriter.close();
             dataFile.close();
+            if (type == FinishType.CLOSE)
+                iwriter.bf.close();
         }
 
         // write sstable statistics
@@ -521,9 +522,8 @@ public class SSTableWriter extends SSTable
 
         // remove the 'tmp' marker from all components
         Descriptor descriptor = this.descriptor;
-        switch (type)
+        if (type.isFinal)
         {
-            case NORMAL: case FINISH_EARLY:
             dataFile.writeFullChecksum(descriptor);
             writeMetadata(descriptor, metadataComponents);
             // save the table of components
@@ -629,11 +629,10 @@ public class SSTableWriter extends SSTable
             builder.addPotentialBoundary(indexPosition);
         }
 
-        public void abort(boolean closeBf)
+        public void abort()
         {
             indexFile.abort();
-            if (closeBf)
-                bf.close();
+            bf.close();
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index 57f465f..8334965 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -25,7 +25,17 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
 {
     public BufferedPoolingSegmentedFile(String path, long length)
     {
-        super(path, length);
+        super(new Cleanup(path), path, length);
+    }
+
+    private BufferedPoolingSegmentedFile(BufferedPoolingSegmentedFile copy)
+    {
+        super(copy);
+    }
+
+    public BufferedPoolingSegmentedFile sharedCopy()
+    {
+        return new BufferedPoolingSegmentedFile(this);
     }
 
     public static class Builder extends SegmentedFile.Builder

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 2f715da..c29bbf3 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -20,12 +20,30 @@ package org.apache.cassandra.io.util;
 import java.io.File;
 
 import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.utils.concurrent.SharedCloseable;
 
 public class BufferedSegmentedFile extends SegmentedFile
 {
     public BufferedSegmentedFile(String path, long length)
     {
-        super(path, length);
+        super(new Cleanup(path), path, length);
+    }
+
+    private BufferedSegmentedFile(BufferedSegmentedFile copy)
+    {
+        super(copy);
+    }
+
+    private static class Cleanup extends SegmentedFile.Cleanup
+    {
+        protected Cleanup(String path)
+        {
+            super(path);
+        }
+        public void tidy() throws Exception
+        {
+
+        }
     }
 
     public static class Builder extends SegmentedFile.Builder
@@ -49,7 +67,8 @@ public class BufferedSegmentedFile extends SegmentedFile
         return reader;
     }
 
-    public void cleanup()
+    public BufferedSegmentedFile sharedCopy()
     {
+        return new BufferedSegmentedFile(this);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 11d091a..94d23bf 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -28,10 +28,31 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
 
     public CompressedPoolingSegmentedFile(String path, CompressionMetadata metadata)
     {
-        super(path, metadata.dataLength, metadata.compressedFileLength);
+        super(new Cleanup(path, metadata), path, metadata.dataLength, metadata.compressedFileLength);
         this.metadata = metadata;
     }
 
+    private CompressedPoolingSegmentedFile(CompressedPoolingSegmentedFile copy)
+    {
+        super(copy);
+        this.metadata = copy.metadata;
+    }
+
+    protected static final class Cleanup extends PoolingSegmentedFile.Cleanup
+    {
+        final CompressionMetadata metadata;
+        protected Cleanup(String path, CompressionMetadata metadata)
+        {
+            super(path);
+            this.metadata = metadata;
+        }
+        public void tidy() throws Exception
+        {
+            super.tidy();
+            metadata.close();
+        }
+    }
+
     public static class Builder extends CompressedSegmentedFile.Builder
     {
         public Builder(CompressedSequentialWriter writer)
@@ -59,10 +80,8 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
         return metadata;
     }
 
-    @Override
-    public void cleanup()
+    public CompressedPoolingSegmentedFile sharedCopy()
     {
-        super.cleanup();
-        metadata.close();
+        return new CompressedPoolingSegmentedFile(this);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index b788715..0c20bb9 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -28,10 +28,35 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
 
     public CompressedSegmentedFile(String path, CompressionMetadata metadata)
     {
-        super(path, metadata.dataLength, metadata.compressedFileLength);
+        super(new Cleanup(path, metadata), path, metadata.dataLength, metadata.compressedFileLength);
         this.metadata = metadata;
     }
 
+    private CompressedSegmentedFile(CompressedSegmentedFile copy)
+    {
+        super(copy);
+        this.metadata = copy.metadata;
+    }
+
+    private static final class Cleanup extends SegmentedFile.Cleanup
+    {
+        final CompressionMetadata metadata;
+        protected Cleanup(String path, CompressionMetadata metadata)
+        {
+            super(path);
+            this.metadata = metadata;
+        }
+        public void tidy() throws Exception
+        {
+            metadata.close();
+        }
+    }
+
+    public CompressedSegmentedFile sharedCopy()
+    {
+        return new CompressedSegmentedFile(this);
+    }
+
     public static class Builder extends SegmentedFile.Builder
     {
         protected final CompressedSequentialWriter writer;
@@ -70,9 +95,4 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
     {
         return metadata;
     }
-
-    public void cleanup()
-    {
-        metadata.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index 5306433..3874669 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.util;
 
+import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
@@ -30,7 +31,7 @@ import sun.nio.ch.DirectBuffer;
 /**
  * An off-heap region of memory that must be manually free'd when no longer needed.
  */
-public class Memory
+public class Memory implements AutoCloseable
 {
     private static final Unsafe unsafe = NativeAllocator.unsafe;
     private static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
@@ -302,6 +303,11 @@ public class Memory
         peer = 0;
     }
 
+    public void close() throws Exception
+    {
+        free();
+    }
+
     public long size()
     {
         assert peer != 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 3b2cc98..8b4ae9d 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -47,10 +47,21 @@ public class MmappedSegmentedFile extends SegmentedFile
 
     public MmappedSegmentedFile(String path, long length, Segment[] segments)
     {
-        super(path, length);
+        super(new Cleanup(path, segments), path, length);
         this.segments = segments;
     }
 
+    private MmappedSegmentedFile(MmappedSegmentedFile copy)
+    {
+        super(copy);
+        this.segments = copy.segments;
+    }
+
+    public MmappedSegmentedFile sharedCopy()
+    {
+        return new MmappedSegmentedFile(this);
+    }
+
     /**
      * @return The segment entry for the given position.
      */
@@ -85,31 +96,41 @@ public class MmappedSegmentedFile extends SegmentedFile
         return file;
     }
 
-    public void cleanup()
+    private static final class Cleanup extends SegmentedFile.Cleanup
     {
-        if (!FileUtils.isCleanerAvailable())
-            return;
+        final Segment[] segments;
+        protected Cleanup(String path, Segment[] segments)
+        {
+            super(path);
+            this.segments = segments;
+        }
+
+        public void tidy()
+        {
+            if (!FileUtils.isCleanerAvailable())
+                return;
 
         /*
          * Try forcing the unmapping of segments using undocumented unsafe sun APIs.
          * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
          * If this works and a thread tries to access any segment, hell will unleash on earth.
          */
-        try
-        {
-            for (Segment segment : segments)
+            try
             {
-                if (segment.right == null)
-                    continue;
-                FileUtils.clean(segment.right);
+                for (Segment segment : segments)
+                {
+                    if (segment.right == null)
+                        continue;
+                    FileUtils.clean(segment.right);
+                }
+                logger.debug("All segments have been unmapped successfully");
+            }
+            catch (Exception e)
+            {
+                JVMStabilityInspector.inspectThrowable(e);
+                // This is not supposed to happen
+                logger.error("Error while unmapping segments", e);
             }
-            logger.debug("All segments have been unmapped successfully");
-        }
-        catch (Exception e)
-        {
-            JVMStabilityInspector.inspectThrowable(e);
-            // This is not supposed to happen
-            logger.error("Error while unmapping segments", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index 01f4e31..daca22f 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -21,15 +21,35 @@ import org.apache.cassandra.service.FileCacheService;
 
 public abstract class PoolingSegmentedFile extends SegmentedFile
 {
-    final FileCacheService.CacheKey cacheKey = new FileCacheService.CacheKey();
-    protected PoolingSegmentedFile(String path, long length)
+    final FileCacheService.CacheKey cacheKey;
+    protected PoolingSegmentedFile(Cleanup cleanup, String path, long length)
     {
-        super(path, length);
+        this(cleanup, path, length, length);
     }
 
-    protected PoolingSegmentedFile(String path, long length, long onDiskLength)
+    protected PoolingSegmentedFile(Cleanup cleanup, String path, long length, long onDiskLength)
     {
-        super(path, length, onDiskLength);
+        super(cleanup, path, length, onDiskLength);
+        cacheKey = cleanup.cacheKey;
+    }
+
+    public PoolingSegmentedFile(PoolingSegmentedFile copy)
+    {
+        super(copy);
+        cacheKey = copy.cacheKey;
+    }
+
+    protected static class Cleanup extends SegmentedFile.Cleanup
+    {
+        final FileCacheService.CacheKey cacheKey = new FileCacheService.CacheKey();
+        protected Cleanup(String path)
+        {
+            super(path);
+        }
+        public void tidy() throws Exception
+        {
+            FileCacheService.instance.invalidate(cacheKey, path);
+        }
     }
 
     public FileDataInput getSegment(long position)
@@ -49,9 +69,4 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
     {
         FileCacheService.instance.put(cacheKey, reader);
     }
-
-    public void cleanup()
-    {
-        FileCacheService.instance.invalidate(cacheKey, path);
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index badae56..d557b72 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -31,6 +31,8 @@ import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
 
 /**
  * Abstracts a read-only file that has been split into segments, each of which can be represented by an independent
@@ -41,7 +43,7 @@ import org.apache.cassandra.utils.Pair;
  * would need to be longer than 2GB, that segment will not be mmap'd, and a new RandomAccessFile will be created for
  * each access to that segment.
  */
-public abstract class SegmentedFile
+public abstract class SegmentedFile extends SharedCloseableImpl
 {
     public final String path;
     public final long length;
@@ -53,18 +55,43 @@ public abstract class SegmentedFile
     /**
      * Use getBuilder to get a Builder to construct a SegmentedFile.
      */
-    SegmentedFile(String path, long length)
+    SegmentedFile(Cleanup cleanup, String path, long length)
     {
-        this(path, length, length);
+        this(cleanup, path, length, length);
     }
 
-    protected SegmentedFile(String path, long length, long onDiskLength)
+    protected SegmentedFile(Cleanup cleanup, String path, long length, long onDiskLength)
     {
+        super(cleanup);
         this.path = new File(path).getAbsolutePath();
         this.length = length;
         this.onDiskLength = onDiskLength;
     }
 
+    public SegmentedFile(SegmentedFile copy)
+    {
+        super(copy);
+        path = copy.path;
+        length = copy.length;
+        onDiskLength = copy.onDiskLength;
+    }
+
+    protected static abstract class Cleanup implements RefCounted.Tidy
+    {
+        final String path;
+        protected Cleanup(String path)
+        {
+            this.path = path;
+        }
+
+        public String name()
+        {
+            return path;
+        }
+    }
+
+    public abstract SegmentedFile sharedCopy();
+
     /**
      * @return A SegmentedFile.Builder.
      */
@@ -96,11 +123,6 @@ public abstract class SegmentedFile
     }
 
     /**
-     * Do whatever action is needed to reclaim ressources used by this SegmentedFile.
-     */
-    public abstract void cleanup();
-
-    /**
      * Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it.
      */
     public static abstract class Builder

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 15e7641..bf1cdd6 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -432,7 +432,7 @@ public class ActiveRepairService
         {
             Set<SSTableReader> sstables = sstableMap.get(cfId);
             Iterator<SSTableReader> sstableIterator = sstables.iterator();
-            ImmutableMap.Builder<SSTableReader, Ref> references = ImmutableMap.builder();
+            ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
             while (sstableIterator.hasNext())
             {
                 SSTableReader sstable = sstableIterator.next();
@@ -442,7 +442,7 @@ public class ActiveRepairService
                 }
                 else
                 {
-                    Ref ref = sstable.tryRef();
+                    Ref<SSTableReader> ref = sstable.tryRef();
                     if (ref == null)
                         sstableIterator.remove();
                     else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 6108dea..a9f5075 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -353,7 +353,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     public static class SSTableStreamingSections
     {
         public final SSTableReader sstable;
-        public final Ref ref;
+        public final Ref<SSTableReader> ref;
         public final List<Pair<Long, Long>> sections;
         public final long estimatedKeys;
         public final long repairedAt;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 5ebf289..069e97f 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -62,7 +62,7 @@ public class OutgoingFileMessage extends StreamMessage
 
     public final FileMessageHeader header;
     public final SSTableReader sstable;
-    public final Ref ref;
+    public final Ref<SSTableReader> ref;
 
     public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 1bc2674..d420218 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -121,7 +121,7 @@ public class StandaloneScrubber
 
                         // Remove the sstable (it's been copied by scrub and snapshotted)
                         sstable.markObsolete();
-                        sstable.sharedRef().release();
+                        sstable.selfRef().release();
                     }
                     catch (Exception e)
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index cc162d4..1a029e5 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -32,6 +32,11 @@ public class AlwaysPresentFilter implements IFilter
 
     public void close() { }
 
+    public IFilter sharedCopy()
+    {
+        return this;
+    }
+
     public long serializedSize() { return 0; }
 
     @Override