You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/28 16:20:28 UTC

[4/7] cassandra git commit: Safer Resource Management

Safer Resource Management

patch by benedict; review by marcuse for CASSANDRA-7705


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c75ee416
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c75ee416
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c75ee416

Branch: refs/heads/trunk
Commit: c75ee4160cb8fcdf47c90bfce8bf0d861f32d268
Parents: 9efa017
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 28 14:45:31 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 28 14:46:16 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 143 +++---
 .../org/apache/cassandra/db/DataTracker.java    |   6 +-
 .../compaction/AbstractCompactionStrategy.java  |   2 +-
 .../db/compaction/CompactionController.java     |  20 +-
 .../db/compaction/CompactionManager.java        |  26 +-
 .../cassandra/db/compaction/CompactionTask.java |   9 +-
 .../cassandra/db/index/SecondaryIndex.java      |  10 +-
 .../apache/cassandra/io/sstable/SSTable.java    |   1 +
 .../cassandra/io/sstable/SSTableLoader.java     |  14 +-
 .../cassandra/io/sstable/SSTableReader.java     | 444 +++++++++----------
 .../cassandra/io/sstable/SSTableRewriter.java   |  15 +-
 .../cassandra/io/sstable/SSTableWriter.java     |   4 +-
 .../cassandra/service/ActiveRepairService.java  |  18 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  10 +-
 .../cassandra/streaming/StreamSession.java      |  28 +-
 .../cassandra/streaming/StreamTransferTask.java |  10 +-
 .../streaming/messages/OutgoingFileMessage.java |  10 +-
 .../cassandra/tools/StandaloneScrubber.java     |   2 +-
 .../apache/cassandra/utils/concurrent/Ref.java  | 134 ++++++
 .../cassandra/utils/concurrent/RefCounted.java  |  94 ++++
 .../utils/concurrent/RefCountedImpl.java        | 132 ++++++
 .../apache/cassandra/utils/concurrent/Refs.java | 219 +++++++++
 .../unit/org/apache/cassandra/SchemaLoader.java |  10 +
 .../org/apache/cassandra/db/KeyCacheTest.java   |   9 +-
 .../db/compaction/AntiCompactionTest.java       |  26 +-
 .../compaction/BlacklistingCompactionsTest.java |  11 +
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../io/sstable/SSTableRewriterTest.java         |   4 +-
 .../streaming/StreamTransferTaskTest.java       |   2 +-
 .../streaming/StreamingTransferTest.java        |  18 +-
 .../utils/concurrent/RefCountedTest.java        |  85 ++++
 33 files changed, 1097 insertions(+), 423 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff6a26f..d142a68 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Safer Resource Management (CASSANDRA-7705)
  * Make sure we compact highly overlapping cold sstables with
    STCS (CASSANDRA-8635)
  * rpc_interface and listen_interface generate NPE on startup when specified interface doesn't exist (CASSANDRA-8677)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 2e5d0ac..d53a0f7 100644
--- a/build.xml
+++ b/build.xml
@@ -1109,6 +1109,7 @@
         <jvmarg value="-Djava.awt.headless=true"/>
         <jvmarg value="-javaagent:${basedir}/lib/jamm-0.3.0.jar" />
         <jvmarg value="-ea"/>
+        <jvmarg value="-Dcassandra.debugrefcount=true"/>
         <jvmarg value="-Xss256k"/>
         <jvmarg value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
         <jvmarg value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 3822648..62aadf9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,6 +33,7 @@ import com.google.common.base.*;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 
+import org.apache.cassandra.io.FSWriteError;
 import org.json.simple.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +59,6 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -72,8 +72,8 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamLockfile;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.*;
 import org.apache.cassandra.utils.TopKSampler.SamplerResult;
-import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 import com.clearspring.analytics.stream.Counter;
@@ -767,16 +767,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         logger.info("Loading new SSTables and building secondary indexes for {}/{}: {}", keyspace.getName(), name, newSSTables);
-        SSTableReader.acquireReferences(newSSTables);
-        data.addSSTables(newSSTables);
-        try
+
+        try (Refs<SSTableReader> refs = Refs.ref(newSSTables))
         {
+            data.addSSTables(newSSTables);
             indexManager.maybeBuildSecondaryIndexes(newSSTables, indexManager.allIndexesNames());
         }
-        finally
-        {
-            SSTableReader.releaseReferences(newSSTables);
-        }
 
         logger.info("Done loading load new SSTables for {}/{}", keyspace.getName(), name);
     }
@@ -788,18 +784,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames));
 
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        try
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
         {
             cfs.indexManager.setIndexRemoved(indexes);
-            SSTableReader.acquireReferences(sstables);
             logger.info(String.format("User Requested secondary index re-build for %s/%s indexes", ksName, cfName));
             cfs.indexManager.maybeBuildSecondaryIndexes(sstables, indexes);
             cfs.indexManager.setIndexBuilt(indexes);
         }
-        finally
-        {
-            SSTableReader.releaseReferences(sstables);
-        }
     }
 
     public String getColumnFamilyName()
@@ -1285,13 +1277,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @return sstables whose key range overlaps with that of the given sstables, not including itself.
      * (The given sstables may or may not overlap with each other.)
      */
-    public Set<SSTableReader> getOverlappingSSTables(Collection<SSTableReader> sstables)
+    public Collection<SSTableReader> getOverlappingSSTables(Iterable<SSTableReader> sstables)
     {
         logger.debug("Checking for sstables overlapping {}", sstables);
 
         // a normal compaction won't ever have an empty sstables list, but we create a skeleton
         // compaction controller for streaming, and that passes an empty list.
-        if (sstables.isEmpty())
+        if (!sstables.iterator().hasNext())
             return ImmutableSet.of();
 
         DataTracker.SSTableIntervalTree tree = data.getView().intervalTree;
@@ -1310,13 +1302,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /**
      * like getOverlappingSSTables, but acquires references before returning
      */
-    public Set<SSTableReader> getAndReferenceOverlappingSSTables(Collection<SSTableReader> sstables)
+    public Refs<SSTableReader> getAndReferenceOverlappingSSTables(Iterable<SSTableReader> sstables)
     {
         while (true)
         {
-            Set<SSTableReader> overlapped = getOverlappingSSTables(sstables);
-            if (SSTableReader.acquireReferences(overlapped))
-                return overlapped;
+            Iterable<SSTableReader> overlapped = getOverlappingSSTables(sstables);
+            Refs<SSTableReader> refs = Refs.tryRef(overlapped);
+            if (refs != null)
+                return refs;
         }
     }
 
@@ -1793,38 +1786,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return removeDeletedCF(cf, gcBefore);
     }
 
-    /**
-     * Get the current view and acquires references on all its sstables.
-     * This is a bit tricky because we must ensure that between the time we
-     * get the current view and the time we acquire the references the set of
-     * sstables hasn't changed. Otherwise we could get a view for which an
-     * sstable have been deleted in the meantime.
-     *
-     * At the end of this method, a reference on all the sstables of the
-     * returned view will have been acquired and must thus be released when
-     * appropriate.
-     */
-    private DataTracker.View markCurrentViewReferenced()
-    {
-        while (true)
-        {
-            DataTracker.View currentView = data.getView();
-            if (SSTableReader.acquireReferences(currentView.sstables))
-                return currentView;
-        }
-    }
-
-    /**
-     * Get the current sstables, acquiring references on all of them.
-     * The caller is in charge of releasing the references on the sstables.
-     *
-     * See markCurrentViewReferenced() above.
-     */
-    public Collection<SSTableReader> markCurrentSSTablesReferenced()
-    {
-        return markCurrentViewReferenced().sstables;
-    }
-
     public Set<SSTableReader> getUnrepairedSSTables()
     {
         Set<SSTableReader> unRepairedSSTables = new HashSet<>(getSSTables());
@@ -1851,13 +1812,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return repairedSSTables;
     }
 
-    public ViewFragment selectAndReference(Function<DataTracker.View, List<SSTableReader>> filter)
+    public RefViewFragment selectAndReference(Function<DataTracker.View, List<SSTableReader>> filter)
     {
         while (true)
         {
             ViewFragment view = select(filter);
-            if (view.sstables.isEmpty() || SSTableReader.acquireReferences(view.sstables))
-                return view;
+            Refs<SSTableReader> refs = Refs.tryRef(view.sstables);
+            if (refs != null)
+                return new RefViewFragment(view.sstables, view.memtables, refs);
         }
     }
 
@@ -2253,9 +2215,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         for (ColumnFamilyStore cfs : concatWithIndexes())
         {
-            DataTracker.View currentView = cfs.markCurrentViewReferenced();
             final JSONArray filesJSONArr = new JSONArray();
-            try
+            try (RefViewFragment currentView = cfs.selectAndReference(ALL_SSTABLES))
             {
                 for (SSTableReader ssTable : currentView.sstables)
                 {
@@ -2273,10 +2234,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                 writeSnapshotManifest(filesJSONArr, snapshotName);
             }
-            finally
-            {
-                SSTableReader.releaseReferences(currentView.sstables);
-            }
         }
     }
 
@@ -2300,13 +2257,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    public List<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException
+    public Refs<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException
     {
         Map<Integer, SSTableReader> active = new HashMap<>();
         for (SSTableReader sstable : data.getView().sstables)
             active.put(sstable.descriptor.generation, sstable);
         Map<Descriptor, Set<Component>> snapshots = directories.sstableLister().snapshots(tag).list();
-        List<SSTableReader> readers = new ArrayList<>(snapshots.size());
+        Refs<SSTableReader> refs = new Refs<>();
         try
         {
             for (Map.Entry<Descriptor, Set<Component>> entries : snapshots.entrySet())
@@ -2314,29 +2271,28 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 // Try acquire reference to an active sstable instead of snapshot if it exists,
                 // to avoid opening new sstables. If it fails, use the snapshot reference instead.
                 SSTableReader sstable = active.get(entries.getKey().generation);
-                if (sstable == null || !sstable.acquireReference())
+                if (sstable == null || !refs.tryRef(sstable))
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("using snapshot sstable {}", entries.getKey());
                     sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner);
                     // This is technically not necessary since it's a snapshot but makes things easier
-                    sstable.acquireReference();
+                    refs.tryRef(sstable);
                 }
                 else if (logger.isDebugEnabled())
                 {
                     logger.debug("using active sstable {}", entries.getKey());
                 }
-                readers.add(sstable);
             }
         }
         catch (IOException | RuntimeException e)
         {
             // In case one of the snapshot sstables fails to open,
             // we must release the references to the ones we opened so far
-            SSTableReader.releaseReferences(readers);
+            refs.release();
             throw e;
         }
-        return readers;
+        return refs;
     }
 
     /**
@@ -2479,37 +2435,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public Iterable<DecoratedKey> keySamples(Range<Token> range)
     {
-        Collection<SSTableReader> sstables = markCurrentSSTablesReferenced();
-        try
+        try (RefViewFragment view = selectAndReference(ALL_SSTABLES))
         {
-            Iterable<DecoratedKey>[] samples = new Iterable[sstables.size()];
+            Iterable<DecoratedKey>[] samples = new Iterable[view.sstables.size()];
             int i = 0;
-            for (SSTableReader sstable: sstables)
+            for (SSTableReader sstable: view.sstables)
             {
                 samples[i++] = sstable.getKeySamples(range);
             }
             return Iterables.concat(samples);
         }
-        finally
-        {
-            SSTableReader.releaseReferences(sstables);
-        }
     }
 
     public long estimatedKeysForRange(Range<Token> range)
     {
-        Collection<SSTableReader> sstables = markCurrentSSTablesReferenced();
-        try
+        try (RefViewFragment view = selectAndReference(ALL_SSTABLES))
         {
             long count = 0;
-            for (SSTableReader sstable : sstables)
+            for (SSTableReader sstable : view.sstables)
                 count += sstable.estimatedKeysForRanges(Collections.singleton(range));
             return count;
         }
-        finally
-        {
-            SSTableReader.releaseReferences(sstables);
-        }
     }
 
     /**
@@ -2888,6 +2834,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    public static class RefViewFragment extends ViewFragment implements AutoCloseable
+    {
+        public final Refs<SSTableReader> refs;
+
+        public RefViewFragment(List<SSTableReader> sstables, Iterable<Memtable> memtables, Refs<SSTableReader> refs)
+        {
+            super(sstables, memtables);
+            this.refs = refs;
+        }
+
+        public void release()
+        {
+            refs.release();
+        }
+
+        public void close()
+        {
+            refs.release();
+        }
+    }
+
     /**
      * Returns the creation time of the oldest memtable not fully flushed yet.
      */
@@ -2951,4 +2918,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         fileIndexGenerator.set(0);
     }
+
+    public static final Function<DataTracker.View, List<SSTableReader>> ALL_SSTABLES = new Function<DataTracker.View, List<SSTableReader>>()
+    {
+        public List<SSTableReader> apply(DataTracker.View view)
+        {
+            return new ArrayList<>(view.sstables);
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index d086b47..f672cf2 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -367,7 +367,7 @@ public class DataTracker
         while (!view.compareAndSet(currentView, newView));
         for (SSTableReader sstable : currentView.sstables)
             if (!remaining.contains(sstable))
-                sstable.releaseReference();
+                sstable.sharedRef().release();
         notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
     }
 
@@ -406,7 +406,7 @@ public class DataTracker
             sstable.setTrackedBy(this);
 
         for (SSTableReader sstable : oldSSTables)
-            sstable.releaseReference();
+            sstable.sharedRef().release();
     }
 
     private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)
@@ -467,7 +467,7 @@ public class DataTracker
         {
             boolean firstToCompact = sstable.markObsolete();
             assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted";
-            sstable.releaseReference();
+            sstable.sharedRef().release();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 10abd01..9b25a1b 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -372,7 +372,7 @@ public abstract class AbstractCompactionStrategy
         if (uncheckedTombstoneCompaction)
             return true;
 
-        Set<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(sstable));
+        Collection<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(sstable));
         if (overlaps.isEmpty())
         {
             // there is no overlap, tombstones are safely droppable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index f23d39a..5217189 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 
+import org.apache.cassandra.utils.concurrent.Refs;
+
 /**
  * Manage compaction options.
  */
@@ -42,8 +44,8 @@ public class CompactionController implements AutoCloseable
 
     public final ColumnFamilyStore cfs;
     private DataTracker.SSTableIntervalTree overlappingTree;
-    private Set<SSTableReader> overlappingSSTables;
-    private final Set<SSTableReader> compacting;
+    private Refs<SSTableReader> overlappingSSTables;
+    private final Iterable<SSTableReader> compacting;
 
     public final int gcBefore;
 
@@ -76,11 +78,13 @@ public class CompactionController implements AutoCloseable
     private void refreshOverlaps()
     {
         if (this.overlappingSSTables != null)
-            SSTableReader.releaseReferences(overlappingSSTables);
+            overlappingSSTables.release();
 
-        Set<SSTableReader> overlapping = compacting == null ? null : cfs.getAndReferenceOverlappingSSTables(compacting);
-        this.overlappingSSTables = overlapping == null ? Collections.<SSTableReader>emptySet() : overlapping;
-        this.overlappingTree = overlapping == null ? null : DataTracker.buildIntervalTree(overlapping);
+        if (compacting == null)
+            overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList());
+        else
+            overlappingSSTables = cfs.getAndReferenceOverlappingSSTables(compacting);
+        this.overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables);
     }
 
     public Set<SSTableReader> getFullyExpiredSSTables()
@@ -104,7 +108,7 @@ public class CompactionController implements AutoCloseable
      * @param gcBefore
      * @return
      */
-    public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore cfStore, Set<SSTableReader> compacting, Set<SSTableReader> overlapping, int gcBefore)
+    public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore cfStore, Iterable<SSTableReader> compacting, Iterable<SSTableReader> overlapping, int gcBefore)
     {
         logger.debug("Checking droppable sstables in {}", cfStore);
 
@@ -187,6 +191,6 @@ public class CompactionController implements AutoCloseable
 
     public void close()
     {
-        SSTableReader.releaseReferences(overlappingSSTables);
+        overlappingSSTables.release();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 02f5e81..f59938f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -82,6 +82,8 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.*;
 
+import org.apache.cassandra.utils.concurrent.Refs;
+
 /**
  * A singleton which manages a private executor of ongoing compactions.
  * <p/>
@@ -370,7 +372,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                           final Collection<Range<Token>> ranges,
-                                          final Collection<SSTableReader> sstables,
+                                          final Refs<SSTableReader> sstables,
                                           final long repairedAt)
     {
         Runnable runnable = new WrappedRunnable() {
@@ -381,18 +383,12 @@ public class CompactionManager implements CompactionManagerMBean
                 while (!success)
                 {
                     for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
-                    {
-                        if (sstables.remove(compactingSSTable))
-                            SSTableReader.releaseReferences(Arrays.asList(compactingSSTable));
-                    }
+                        sstables.releaseIfHolds(compactingSSTable);
                     Set<SSTableReader> compactedSSTables = new HashSet<>();
                     for (SSTableReader sstable : sstables)
-                    {
                         if (sstable.isMarkedCompacted())
                             compactedSSTables.add(sstable);
-                    }
-                    sstables.removeAll(compactedSSTables);
-                    SSTableReader.releaseReferences(compactedSSTables);
+                    sstables.release(compactedSSTables);
                     success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
                 }
                 performAnticompaction(cfs, ranges, sstables, repairedAt);
@@ -413,7 +409,7 @@ public class CompactionManager implements CompactionManagerMBean
      */
     public void performAnticompaction(ColumnFamilyStore cfs,
                                       Collection<Range<Token>> ranges,
-                                      Collection<SSTableReader> validatedForRepair,
+                                      Refs<SSTableReader> validatedForRepair,
                                       long repairedAt) throws InterruptedException, ExecutionException, IOException
     {
         logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
@@ -454,13 +450,13 @@ public class CompactionManager implements CompactionManagerMBean
             }
             cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
             cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
-            SSTableReader.releaseReferences(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+            validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             if (!sstables.isEmpty())
                 doAntiCompaction(cfs, ranges, sstables, repairedAt);
         }
         finally
         {
-            SSTableReader.releaseReferences(sstables);
+            validatedForRepair.release();
             cfs.getDataTracker().unmarkCompacting(sstables);
         }
 
@@ -899,7 +895,7 @@ public class CompactionManager implements CompactionManagerMBean
         if (!cfs.isValid())
             return;
 
-        Collection<SSTableReader> sstables = null;
+        Refs<SSTableReader> sstables = null;
         try
         {
 
@@ -924,7 +920,7 @@ public class CompactionManager implements CompactionManagerMBean
                 // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
                 // instead so they won't be cleaned up if they do get compacted during the validation
                 if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
-                    sstables = cfs.markCurrentSSTablesReferenced();
+                    sstables = cfs.selectAndReference(ColumnFamilyStore.ALL_SSTABLES).refs;
                 else
                     sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
 
@@ -990,7 +986,7 @@ public class CompactionManager implements CompactionManagerMBean
         finally
         {
             if (sstables != null)
-                SSTableReader.releaseReferences(sstables);
+                sstables.release();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index eda09c0..b6c215e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -214,7 +214,14 @@ public class CompactionTask extends AbstractCompactionTask
                 }
                 catch (Throwable t)
                 {
-                    writer.abort();
+                    try
+                    {
+                        writer.abort();
+                    }
+                    catch (Throwable t2)
+                    {
+                        t.addSuppressed(t2);
+                    }
                     throw t;
                 }
                 finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 62b9e4c..be100f4 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Set;
@@ -52,6 +51,8 @@ import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import org.apache.cassandra.utils.concurrent.Refs;
+
 /**
  * Abstract base class for different types of secondary indexes.
  *
@@ -202,8 +203,7 @@ public abstract class SecondaryIndex
         logger.info(String.format("Submitting index build of %s for data in %s",
                 getIndexName(), StringUtils.join(baseCfs.getSSTables(), ", ")));
 
-        Collection<SSTableReader> sstables = baseCfs.markCurrentSSTablesReferenced();
-        try
+        try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(ColumnFamilyStore.ALL_SSTABLES).refs)
         {
             SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
                                                                       Collections.singleton(getIndexName()),
@@ -213,10 +213,6 @@ public abstract class SecondaryIndex
             forceBlockingFlush();
             setIndexBuilt();
         }
-        finally
-        {
-            SSTableReader.releaseReferences(sstables);
-        }
         logger.info("Index build of {} complete", getIndexName());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 6eff369..ac0741d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.RefCounted;
 import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.utils.Pair;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 3d7eea7..1cab8c7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -39,6 +39,8 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.Pair;
 
+import org.apache.cassandra.utils.concurrent.Ref;
+
 /**
  * Cassandra SSTable bulk loader.
  * Load an externally created sstable into a cluster.
@@ -130,7 +132,7 @@ public class SSTableLoader implements StreamEventHandler
                         List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
                         long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
 
-                        StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
+                        StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, sstable.sharedRef(), sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
                         streamingDetails.put(endpoint, details);
                     }
 
@@ -176,15 +178,17 @@ public class SSTableLoader implements StreamEventHandler
                 continue;
 
             List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
-
+            List<Ref> refs = new ArrayList<>();
             try
             {
                 // transferSSTables assumes references have been acquired
                 for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
                 {
-                    if (!details.sstable.acquireReference())
+                    Ref ref = details.sstable.tryRef();
+                    if (ref == null)
                         throw new IllegalStateException();
 
+                    refs.add(ref);
                     endpointDetails.add(details);
                 }
 
@@ -192,8 +196,8 @@ public class SSTableLoader implements StreamEventHandler
             }
             finally
             {
-                for (StreamSession.SSTableStreamingSections details : endpointDetails)
-                    details.sstable.releaseReference();
+                for (Ref ref : refs)
+                    ref.release();
             }
         }
         plan.listeners(this, listeners);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 50bf3e3..c51b586 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -75,7 +74,6 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -114,6 +112,8 @@ import org.apache.cassandra.utils.FilterFactory;
 import org.apache.cassandra.utils.IFilter;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
 
 import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 
@@ -121,7 +121,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR
  * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
  * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
  */
-public class SSTableReader extends SSTable
+public class SSTableReader extends SSTable implements RefCounted
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 
@@ -182,7 +182,6 @@ public class SSTableReader extends SSTable
 
     private final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 
-    private final AtomicInteger references = new AtomicInteger(1);
     // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
     // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
     private final AtomicBoolean isCompacted = new AtomicBoolean(false);
@@ -194,17 +193,8 @@ public class SSTableReader extends SSTable
     private final AtomicLong keyCacheHit = new AtomicLong(0);
     private final AtomicLong keyCacheRequest = new AtomicLong(0);
 
-    /**
-     * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
-     * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
-     * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
-     * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
-     */
-    private Object replaceLock = new Object();
-    private SSTableReader replacedBy;
-    private SSTableReader replaces;
-    private SSTableDeletingTask deletingTask;
-    private Runnable runOnClose;
+    private final Tidier tidy = new Tidier();
+    private final RefCounted refCounted = RefCounted.Impl.get(tidy);
 
     @VisibleForTesting
     public RestorableMeter readMeter;
@@ -409,7 +399,7 @@ public class SSTableReader extends SSTable
         sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
         sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
         sstable.bf = FilterFactory.AlwaysPresent;
-
+        sstable.tidy.setup(sstable);
         return sstable;
     }
 
@@ -459,6 +449,7 @@ public class SSTableReader extends SSTable
         if (sstable.getKeyCache() != null)
             logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 
+        sstable.tidy.setup(sstable);
         return sstable;
     }
 
@@ -555,7 +546,7 @@ public class SSTableReader extends SSTable
         this.maxDataAge = maxDataAge;
         this.openReason = openReason;
 
-        deletingTask = new SSTableDeletingTask(this);
+        tidy.deletingTask = new SSTableDeletingTask(this);
 
         // Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
         // the read meter when in client mode.  Also don't track reads for special operations (like early open)
@@ -600,6 +591,7 @@ public class SSTableReader extends SSTable
         this.dfile = dfile;
         this.indexSummary = indexSummary;
         this.bf = bloomFilter;
+        tidy.setup(this);
     }
 
     public static long getTotalBytes(Iterable<SSTableReader> sstables)
@@ -612,117 +604,6 @@ public class SSTableReader extends SSTable
         return sum;
     }
 
-    private void tidy(boolean release)
-    {
-        if (readMeterSyncFuture != null)
-            readMeterSyncFuture.cancel(false);
-
-        if (references.get() != 0)
-        {
-            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
-        }
-
-        synchronized (replaceLock)
-        {
-            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
-
-            if (replacedBy != null)
-            {
-                closeBf = replacedBy.bf != bf;
-                closeSummary = replacedBy.indexSummary != indexSummary;
-                closeFiles = replacedBy.dfile != dfile;
-                // if the replacement sstablereader uses a different path, clean up our paths
-                deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
-            }
-
-            if (replaces != null)
-            {
-                closeBf &= replaces.bf != bf;
-                closeSummary &= replaces.indexSummary != indexSummary;
-                closeFiles &= replaces.dfile != dfile;
-                deleteFiles &= !dfile.path.equals(replaces.dfile.path);
-            }
-
-            boolean deleteAll = false;
-            if (release && isCompacted.get())
-            {
-                assert replacedBy == null;
-                if (replaces != null && !deleteFiles)
-                {
-                    replaces.replacedBy = null;
-                    replaces.deletingTask = deletingTask;
-                    replaces.markObsolete();
-                }
-                else
-                {
-                    deleteAll = true;
-                }
-            }
-            else
-            {
-                if (replaces != null)
-                    replaces.replacedBy = replacedBy;
-                if (replacedBy != null)
-                    replacedBy.replaces = replaces;
-            }
-
-            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
-        }
-    }
-
-    private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
-    {
-        if (references.get() != 0)
-            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
-
-        final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
-        final OpOrder.Barrier barrier;
-        if (cfs != null)
-        {
-            barrier = cfs.readOrdering.newBarrier();
-            barrier.issue();
-        }
-        else
-            barrier = null;
-
-        ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
-        {
-            public void run()
-            {
-                if (barrier != null)
-                    barrier.await();
-                if (closeBf)
-                    bf.close();
-                if (closeSummary)
-                    indexSummary.close();
-                if (closeFiles)
-                {
-                    ifile.cleanup();
-                    dfile.cleanup();
-                }
-                if (runOnClose != null)
-                    runOnClose.run();
-                if (deleteAll)
-                {
-                    /**
-                     * Do the OS a favour and suggest (using fadvice call) that we
-                     * don't want to see pages of this SSTable in memory anymore.
-                     *
-                     * NOTE: We can't use madvice in java because it requires the address of
-                     * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
-                     */
-                    dropPageCache();
-                    deletingTask.run();
-                }
-                else if (deleteFiles)
-                {
-                    FileUtils.deleteWithConfirm(new File(dfile.path));
-                    FileUtils.deleteWithConfirm(new File(ifile.path));
-                }
-            }
-        });
-    }
-
     public boolean equals(Object that)
     {
         return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
@@ -745,7 +626,7 @@ public class SSTableReader extends SSTable
 
     public void setTrackedBy(DataTracker tracker)
     {
-        deletingTask.setTracker(tracker);
+        tidy.deletingTask.setTracker(tracker);
         // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
         // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
         // here when we know we're being wired into the rest of the server infrastructure.
@@ -817,6 +698,7 @@ public class SSTableReader extends SSTable
         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
         if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
             saveSummary(ibuilder, dbuilder);
+        tidy.setup(this);
     }
 
     /**
@@ -968,26 +850,26 @@ public class SSTableReader extends SSTable
 
     public void setReplacedBy(SSTableReader replacement)
     {
-        synchronized (replaceLock)
+        synchronized (tidy.replaceLock)
         {
-            assert replacedBy == null;
-            replacedBy = replacement;
-            replacement.replaces = this;
-            replacement.replaceLock = replaceLock;
+            assert tidy.replacedBy == null;
+            tidy.replacedBy = replacement;
+            replacement.tidy.replaces = this;
+            replacement.tidy.replaceLock = tidy.replaceLock;
         }
     }
 
     public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
     {
-        synchronized (replaceLock)
+        synchronized (tidy.replaceLock)
         {
-            assert replacedBy == null;
+            assert tidy.replacedBy == null;
 
             if (newStart.compareTo(this.first) > 0)
             {
                 if (newStart.compareTo(this.last) > 0)
                 {
-                    this.runOnClose = new Runnable()
+                    this.tidy.runOnClose = new Runnable()
                     {
                         public void run()
                         {
@@ -1001,7 +883,7 @@ public class SSTableReader extends SSTable
                 {
                     final long dataStart = getPosition(newStart, Operator.GE).position;
                     final long indexStart = getIndexScanPosition(newStart);
-                    this.runOnClose = new Runnable()
+                    this.tidy.runOnClose = new Runnable()
                     {
                         public void run()
                         {
@@ -1034,9 +916,9 @@ public class SSTableReader extends SSTable
      */
     public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
     {
-        synchronized (replaceLock)
+        synchronized (tidy.replaceLock)
         {
-            assert replacedBy == null;
+            assert tidy.replacedBy == null;
 
             int minIndexInterval = metadata.getMinIndexInterval();
             int maxIndexInterval = metadata.getMaxIndexInterval();
@@ -1671,36 +1553,6 @@ public class SSTableReader extends SSTable
         return dfile.onDiskLength;
     }
 
-    public boolean acquireReference()
-    {
-        while (true)
-        {
-            int n = references.get();
-            if (n <= 0)
-                return false;
-            if (references.compareAndSet(n, n + 1))
-                return true;
-        }
-    }
-
-    @VisibleForTesting
-    public int referenceCount()
-    {
-        return references.get();
-    }
-
-    /**
-     * Release reference to this SSTableReader.
-     * If there is no one referring to this SSTable, and is marked as compacted,
-     * all resources are cleaned up and files are deleted eventually.
-     */
-    public void releaseReference()
-    {
-        if (references.decrementAndGet() == 0)
-            tidy(true);
-        assert references.get() >= 0 : "Reference counter " +  references.get() + " for " + dfile.path;
-    }
-
     /**
      * Mark the sstable as obsolete, i.e., compacted into newer sstables.
      *
@@ -1715,9 +1567,9 @@ public class SSTableReader extends SSTable
         if (logger.isDebugEnabled())
             logger.debug("Marking {} compacted", getFilename());
 
-        synchronized (replaceLock)
+        synchronized (tidy.replaceLock)
         {
-            assert replacedBy == null : getFilename();
+            assert tidy.replacedBy == null : getFilename();
         }
         return !isCompacted.getAndSet(true);
     }
@@ -1821,13 +1673,13 @@ public class SSTableReader extends SSTable
 
     public SSTableReader getCurrentReplacement()
     {
-        synchronized (replaceLock)
+        synchronized (tidy.replaceLock)
         {
-            SSTableReader cur = this, next = replacedBy;
+            SSTableReader cur = this, next = tidy.replacedBy;
             while (next != null)
             {
                 cur = next;
-                next = next.replacedBy;
+                next = next.tidy.replacedBy;
             }
             return cur;
         }
@@ -2013,90 +1865,216 @@ public class SSTableReader extends SSTable
     }
 
     /**
-     * @param sstables
-     * @return true if all desired references were acquired.  Otherwise, it will unreference any partial acquisition, and return false.
+     * Increment the total row read count and read rate for this SSTable.  This should not be incremented for range
+     * slice queries, row cache hits, or non-query reads, like compaction.
      */
-    public static boolean acquireReferences(Iterable<SSTableReader> sstables)
+    public void incrementReadCount()
     {
-        SSTableReader failed = null;
-        for (SSTableReader sstable : sstables)
-        {
-            if (!sstable.acquireReference())
-            {
-                failed = sstable;
-                break;
-            }
-        }
-
-        if (failed == null)
-            return true;
+        if (readMeter != null)
+            readMeter.mark();
+    }
 
-        for (SSTableReader sstable : sstables)
+    public static class SizeComparator implements Comparator<SSTableReader>
+    {
+        public int compare(SSTableReader o1, SSTableReader o2)
         {
-            if (sstable == failed)
-                break;
-            sstable.releaseReference();
+            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
         }
-        return false;
     }
 
-    public static void releaseReferences(Iterable<SSTableReader> sstables)
+    public Ref tryRef()
     {
-        for (SSTableReader sstable : sstables)
-        {
-            sstable.releaseReference();
-        }
+        return refCounted.tryRef();
     }
 
-    private void dropPageCache()
+    public Ref sharedRef()
     {
-        dropPageCache(dfile.path);
-        dropPageCache(ifile.path);
+        return refCounted.sharedRef();
     }
 
-    private void dropPageCache(String filePath)
+    private static final class Tidier implements Tidy
     {
-        RandomAccessFile file = null;
+        private String name;
+        private CFMetaData metadata;
+        // indexfile and datafile: might be null before a call to load()
+        private SegmentedFile ifile;
+        private SegmentedFile dfile;
 
-        try
-        {
-            file = new RandomAccessFile(filePath, "r");
+        private IndexSummary indexSummary;
+        private IFilter bf;
 
-            int fd = CLibrary.getfd(file.getFD());
+        private AtomicBoolean isCompacted;
 
-            if (fd > 0)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
+        /**
+         * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
+         * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
+         * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
+         * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
+         */
+        private Object replaceLock = new Object();
+        private SSTableReader replacedBy;
+        private SSTableReader replaces;
+        private SSTableDeletingTask deletingTask;
+        private Runnable runOnClose;
 
-                CLibrary.trySkipCache(fd, 0, 0);
-            }
+        @VisibleForTesting
+        public RestorableMeter readMeter;
+        private volatile ScheduledFuture readMeterSyncFuture;
+
+        private void setup(SSTableReader reader)
+        {
+            name = reader.toString();
+            metadata = reader.metadata;
+            ifile = reader.ifile;
+            dfile = reader.dfile;
+            indexSummary = reader.indexSummary;
+            bf = reader.bf;
+            isCompacted = reader.isCompacted;
+            readMeterSyncFuture = reader.readMeterSyncFuture;
         }
-        catch (IOException e)
+
+        public String name()
         {
-            // we don't care if cache cleanup fails
+            return name;
         }
-        finally
+
+        private void dropPageCache()
         {
-            FileUtils.closeQuietly(file);
+            dropPageCache(dfile.path);
+            dropPageCache(ifile.path);
         }
-    }
 
-    /**
-     * Increment the total row read count and read rate for this SSTable.  This should not be incremented for range
-     * slice queries, row cache hits, or non-query reads, like compaction.
-     */
-    public void incrementReadCount()
-    {
-        if (readMeter != null)
-            readMeter.mark();
-    }
+        private void dropPageCache(String filePath)
+        {
+            RandomAccessFile file = null;
 
-    public static class SizeComparator implements Comparator<SSTableReader>
-    {
-        public int compare(SSTableReader o1, SSTableReader o2)
+            try
+            {
+                file = new RandomAccessFile(filePath, "r");
+
+                int fd = CLibrary.getfd(file.getFD());
+
+                if (fd > 0)
+                {
+                    if (logger.isDebugEnabled())
+                        logger.debug(String.format("Dropping page cache of file %s.", filePath));
+
+                    CLibrary.trySkipCache(fd, 0, 0);
+                }
+            }
+            catch (IOException e)
+            {
+                // we don't care if cache cleanup fails
+            }
+            finally
+            {
+                FileUtils.closeQuietly(file);
+            }
+        }
+
+        public void tidy()
         {
-            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
+            if (readMeterSyncFuture != null)
+                readMeterSyncFuture.cancel(false);
+
+            synchronized (replaceLock)
+            {
+                boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
+
+                if (replacedBy != null)
+                {
+                    closeBf = replacedBy.bf != bf;
+                    closeSummary = replacedBy.indexSummary != indexSummary;
+                    closeFiles = replacedBy.dfile != dfile;
+                    // if the replacement sstablereader uses a different path, clean up our paths
+                    deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
+                }
+
+                if (replaces != null)
+                {
+                    closeBf &= replaces.bf != bf;
+                    closeSummary &= replaces.indexSummary != indexSummary;
+                    closeFiles &= replaces.dfile != dfile;
+                    deleteFiles &= !dfile.path.equals(replaces.dfile.path);
+                }
+
+                boolean deleteAll = false;
+                if (isCompacted.get())
+                {
+                    assert replacedBy == null;
+                    if (replaces != null && !deleteFiles)
+                    {
+                        replaces.tidy.replacedBy = null;
+                        replaces.tidy.deletingTask = deletingTask;
+                        replaces.markObsolete();
+                    }
+                    else
+                    {
+                        deleteAll = true;
+                    }
+                }
+                else
+                {
+                    closeSummary &= indexSummary != null;
+                    if (replaces != null)
+                        replaces.tidy.replacedBy = replacedBy;
+                    if (replacedBy != null)
+                        replacedBy.tidy.replaces = replaces;
+                }
+
+                scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
+            }
+        }
+
+        private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
+        {
+            final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+            final OpOrder.Barrier barrier;
+            if (cfs != null)
+            {
+                barrier = cfs.readOrdering.newBarrier();
+                barrier.issue();
+            }
+            else
+                barrier = null;
+
+            ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
+            {
+                public void run()
+                {
+                    if (barrier != null)
+                        barrier.await();
+                    if (closeBf)
+                        bf.close();
+                    if (closeSummary)
+                        indexSummary.close();
+                    if (closeFiles)
+                    {
+                        ifile.cleanup();
+                        dfile.cleanup();
+                    }
+                    if (runOnClose != null)
+                        runOnClose.run();
+                    if (deleteAll)
+                    {
+                        /**
+                         * Do the OS a favour and suggest (using fadvice call) that we
+                         * don't want to see pages of this SSTable in memory anymore.
+                         *
+                         * NOTE: We can't use madvice in java because it requires the address of
+                         * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
+                         */
+                        dropPageCache();
+                        deletingTask.run();
+                    }
+                    else if (deleteFiles)
+                    {
+                        FileUtils.deleteWithConfirm(new File(dfile.path));
+                        FileUtils.deleteWithConfirm(new File(ifile.path));
+                    }
+                }
+            });
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 43ac4b6..7784b18 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -176,14 +176,14 @@ public class SSTableRewriter
 
     public void abort()
     {
-        switchWriter(null);
+        switchWriter(null, true);
         moveStarts(null, Functions.forMap(originalStarts), true);
 
         // remove already completed SSTables
         for (SSTableReader sstable : finished)
         {
             sstable.markObsolete();
-            sstable.releaseReference();
+            sstable.sharedRef().release();
         }
 
         // abort the writers
@@ -276,6 +276,11 @@ public class SSTableRewriter
 
     public void switchWriter(SSTableWriter newWriter)
     {
+        switchWriter(newWriter, false);
+    }
+
+    private void switchWriter(SSTableWriter newWriter, boolean abort)
+    {
         if (writer == null)
         {
             writer = newWriter;
@@ -283,7 +288,7 @@ public class SSTableRewriter
         }
 
         // we leave it as a tmp file, but we open it and add it to the dataTracker
-        if (writer.getFilePointer() != 0)
+        if (writer.getFilePointer() != 0 && !abort)
         {
             SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
             replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
@@ -333,7 +338,7 @@ public class SSTableRewriter
     private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate)
     {
         List<SSTableReader> newReaders = new ArrayList<>();
-        switchWriter(null);
+        switchWriter(null, false);
 
         if (throwEarly)
             throw new RuntimeException("exception thrown early in finish, for testing");
@@ -377,7 +382,7 @@ public class SSTableRewriter
             {
                 if (reader.getCurrentReplacement() == null)
                     reader.markObsolete();
-                reader.releaseReference();
+                reader.sharedRef().release();
             }
         }
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 5f78132..cc60594 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -416,7 +416,7 @@ public class SSTableWriter extends SSTable
         if (inclusiveUpperBoundOfReadableData == null)
         {
             // Prevent leaving tmplink files on disk
-            sstable.releaseReference();
+            sstable.sharedRef().release();
             return null;
         }
         int offset = 2;
@@ -428,7 +428,7 @@ public class SSTableWriter extends SSTable
             inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
             if (inclusiveUpperBoundOfReadableData == null)
             {
-                sstable.releaseReference();
+                sstable.sharedRef().release();
                 return null;
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 36f7c5c..1c5138b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
@@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.Bounds;
@@ -56,6 +56,10 @@ import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+
+import org.apache.cassandra.utils.concurrent.Refs;
 
 /**
  * ActiveRepairService is the starting point for manual "active" repairs.
@@ -376,7 +380,7 @@ public class ActiveRepairService
         List<Future<?>> futures = new ArrayList<>();
         for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
         {
-            Collection<SSTableReader> sstables = new HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()));
+            Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey());
             ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
             futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
         }
@@ -422,10 +426,11 @@ public class ActiveRepairService
             this.repairedAt = repairedAt;
         }
 
-        public Collection<SSTableReader> getAndReferenceSSTables(UUID cfId)
+        public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId)
         {
             Set<SSTableReader> sstables = sstableMap.get(cfId);
             Iterator<SSTableReader> sstableIterator = sstables.iterator();
+            ImmutableMap.Builder<SSTableReader, Ref> references = ImmutableMap.builder();
             while (sstableIterator.hasNext())
             {
                 SSTableReader sstable = sstableIterator.next();
@@ -435,11 +440,14 @@ public class ActiveRepairService
                 }
                 else
                 {
-                    if (!sstable.acquireReference())
+                    Ref ref = sstable.tryRef();
+                    if (ref == null)
                         sstableIterator.remove();
+                    else
+                        references.put(sstable, ref);
                 }
             }
-            return sstables;
+            return new Refs<>(references.build());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index aa18954..44b83f9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -36,6 +36,8 @@ import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
+import org.apache.cassandra.utils.concurrent.Refs;
+
 /**
  * Task that manages receiving files for the session for certain ColumnFamily.
  */
@@ -127,18 +129,12 @@ public class StreamReceiveTask extends StreamTask
             lockfile.delete();
             task.sstables.clear();
 
-            if (!SSTableReader.acquireReferences(readers))
-                throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
-            try
+            try (Refs<SSTableReader> refs = Refs.ref(readers))
             {
                 // add sstables and build secondary indexes
                 cfs.addSSTables(readers);
                 cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
             }
-            finally
-            {
-                SSTableReader.releaseReferences(readers);
-            }
 
             task.session.taskCompleted(task);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 2a3cf55..6108dea 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -45,6 +45,10 @@ import org.apache.cassandra.streaming.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.Refs;
 
 /**
  * Handles the streaming a one or more section of one of more sstables to and from a specific
@@ -267,7 +271,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         finally
         {
             for (SSTableStreamingSections release : sections)
-                release.sstable.releaseReference();
+                release.ref.release();
         }
     }
 
@@ -289,7 +293,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 
     private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, boolean isIncremental)
     {
-        List<SSTableReader> sstables = new ArrayList<>();
+        Refs<SSTableReader> refs = new Refs<>();
         try
         {
             for (ColumnFamilyStore cfStore : stores)
@@ -297,16 +301,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
                     rowBoundsList.add(range.toRowBounds());
-                ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental));
-                sstables.addAll(view.sstables);
+                refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs);
             }
-            List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
-            for (SSTableReader sstable : sstables)
+
+            List<SSTableStreamingSections> sections = new ArrayList<>(refs.size());
+            for (SSTableReader sstable : refs)
             {
                 long repairedAt = overriddenRepairedAt;
                 if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
                     repairedAt = sstable.getSSTableMetadata().repairedAt;
-                sections.add(new SSTableStreamingSections(sstable,
+                sections.add(new SSTableStreamingSections(sstable, refs.get(sstable),
                                                           sstable.getPositionsForRanges(ranges),
                                                           sstable.estimatedKeysForRanges(ranges),
                                                           repairedAt));
@@ -315,7 +319,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         }
         catch (Throwable t)
         {
-            SSTableReader.releaseReferences(sstables);
+            refs.release();
             throw t;
         }
     }
@@ -329,7 +333,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             if (details.sections.isEmpty())
             {
                 // A reference was acquired on the sstable and we won't stream it
-                details.sstable.releaseReference();
+                details.ref.release();
                 iter.remove();
                 continue;
             }
@@ -341,7 +345,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 task = new StreamTransferTask(this, cfId);
                 transfers.put(cfId, task);
             }
-            task.addTransferFile(details.sstable, details.estimatedKeys, details.sections, details.repairedAt);
+            task.addTransferFile(details.sstable, details.ref, details.estimatedKeys, details.sections, details.repairedAt);
             iter.remove();
         }
     }
@@ -349,13 +353,15 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     public static class SSTableStreamingSections
     {
         public final SSTableReader sstable;
+        public final Ref ref;
         public final List<Pair<Long, Long>> sections;
         public final long estimatedKeys;
         public final long repairedAt;
 
-        public SSTableStreamingSections(SSTableReader sstable, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
+        public SSTableStreamingSections(SSTableReader sstable, Ref ref, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
         {
             this.sstable = sstable;
+            this.ref = ref;
             this.sections = sections;
             this.estimatedKeys = estimatedKeys;
             this.repairedAt = repairedAt;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index b840ee5..b00042e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -26,6 +26,8 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
 
 /**
  * StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
@@ -47,10 +49,10 @@ public class StreamTransferTask extends StreamTask
         super(session, cfId);
     }
 
-    public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+    public synchronized void addTransferFile(SSTableReader sstable, Ref ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
     {
         assert sstable != null && cfId.equals(sstable.metadata.cfId);
-        OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
+        OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
         files.put(message.header.sequenceNumber, message);
         totalSize += message.header.size();
     }
@@ -71,7 +73,7 @@ public class StreamTransferTask extends StreamTask
 
             OutgoingFileMessage file = files.remove(sequenceNumber);
             if (file != null)
-                file.sstable.releaseReference();
+                file.ref.release();
 
             signalComplete = files.isEmpty();
         }
@@ -92,7 +94,7 @@ public class StreamTransferTask extends StreamTask
         timeoutTasks.clear();
 
         for (OutgoingFileMessage file : files.values())
-            file.sstable.releaseReference();
+            file.ref.release();
     }
 
     public synchronized int getTotalNumberOfFiles()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index b012869..5ebf289 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -31,6 +31,8 @@ import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
 import org.apache.cassandra.utils.Pair;
 
+import org.apache.cassandra.utils.concurrent.Ref;
+
 /**
  * OutgoingFileMessage is used to transfer the part(or whole) of a SSTable data file.
  */
@@ -58,13 +60,15 @@ public class OutgoingFileMessage extends StreamMessage
         }
     };
 
-    public FileMessageHeader header;
-    public SSTableReader sstable;
+    public final FileMessageHeader header;
+    public final SSTableReader sstable;
+    public final Ref ref;
 
-    public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+    public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
     {
         super(Type.FILE);
         this.sstable = sstable;
+        this.ref = ref;
 
         CompressionInfo compressionInfo = null;
         if (sstable.compression)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 82e3783..63a3727 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -119,7 +119,7 @@ public class StandaloneScrubber
 
                         // Remove the sstable (it's been copied by scrub and snapshotted)
                         sstable.markObsolete();
-                        sstable.releaseReference();
+                        sstable.sharedRef().release();
                     }
                     catch (Exception e)
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
new file mode 100644
index 0000000..4afceb0
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -0,0 +1,134 @@
+package org.apache.cassandra.utils.concurrent;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A single managed reference to a RefCounted object
+ */
+public final class Ref
+{
+    static final Logger logger = LoggerFactory.getLogger(Ref.class);
+    static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
+
+    final State state;
+
+    Ref(RefCountedImpl.GlobalState state, boolean isSharedRef)
+    {
+        this.state = new State(state, this, RefCountedImpl.referenceQueue, isSharedRef);
+    }
+
+    /**
+     * Must be called exactly once, when the logical operation for which this Ref was created has terminated.
+     * Failure to abide by this contract will result in an error (eventually) being reported, assuming a
+     * hard reference to the resource it managed is not leaked.
+     */
+    public void release()
+    {
+        state.release(false);
+    }
+
+    /**
+     * A convenience method for reporting:
+     * @return the number of currently extant references globally, including the shared reference
+     */
+    public int globalCount()
+    {
+        return state.globalState.count();
+    }
+
+    // similar to RefCountedState, but tracks only the management of each unique ref created to the managed object
+    // ensures it is only released once, and that it is always released
+    static final class State extends PhantomReference<Ref>
+    {
+        final Debug debug = DEBUG_ENABLED ? new Debug() : null;
+        final boolean isSharedRef;
+        final RefCountedImpl.GlobalState globalState;
+        private volatile int released;
+
+        private static final AtomicIntegerFieldUpdater<State> releasedUpdater = AtomicIntegerFieldUpdater.newUpdater(State.class, "released");
+
+        public State(final RefCountedImpl.GlobalState globalState, Ref reference, ReferenceQueue<? super Ref> q, boolean isSharedRef)
+        {
+            super(reference, q);
+            this.globalState = globalState;
+            this.isSharedRef = isSharedRef;
+            globalState.register(this);
+        }
+
+        void release(boolean leak)
+        {
+            if (!releasedUpdater.compareAndSet(this, 0, 1))
+            {
+                if (!leak)
+                {
+                    String id = this.toString();
+                    logger.error("BAD RELEASE: attempted to release a{} reference ({}) that has already been released", isSharedRef ? " shared" : "", id);
+                    if (DEBUG_ENABLED)
+                        debug.log(id);
+                    throw new IllegalStateException("Attempted to release a reference that has already been released");
+                }
+                return;
+            }
+            globalState.release(this);
+            if (leak)
+            {
+                String id = this.toString();
+                if (isSharedRef)
+                    logger.error("LEAK DETECTED: the shared reference ({}) to {} was not released before the object was garbage collected", id, globalState);
+                else
+                    logger.error("LEAK DETECTED: a reference ({}) to {} was not released before the reference was garbage collected", id, globalState);
+                if (DEBUG_ENABLED)
+                    debug.log(id);
+            }
+            else if (DEBUG_ENABLED)
+            {
+                debug.deallocate();
+            }
+        }
+    }
+
+    static final class Debug
+    {
+        String allocateThread, deallocateThread;
+        StackTraceElement[] allocateTrace, deallocateTrace;
+        Debug()
+        {
+            Thread thread = Thread.currentThread();
+            allocateThread = thread.toString();
+            allocateTrace = thread.getStackTrace();
+        }
+        synchronized void deallocate()
+        {
+            Thread thread = Thread.currentThread();
+            deallocateThread = thread.toString();
+            deallocateTrace = thread.getStackTrace();
+        }
+        synchronized void log(String id)
+        {
+            logger.error("Allocate trace {}:\n{}", id, print(allocateThread, allocateTrace));
+            if (deallocateThread != null)
+                logger.error("Deallocate trace {}:\n{}", id, print(deallocateThread, deallocateTrace));
+        }
+        String print(String thread, StackTraceElement[] trace)
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append(thread.toString());
+            sb.append("\n");
+            for (StackTraceElement element : trace)
+            {
+                sb.append("\tat ");
+                sb.append(element );
+                sb.append("\n");
+            }
+            return sb.toString();
+        }
+    }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java b/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
new file mode 100644
index 0000000..7ad51ad
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+/**
+ * An object that needs ref counting does the following:
+ *   - defines a Tidy object that will cleanup once it's gone,
+ *     (this must retain no references to the object we're tracking (only its resources and how to clean up))
+ *   - implements RefCounted
+ *   - encapsulates a RefCounted.Impl, to which it proxies all calls to RefCounted behaviours
+ *   - ensures no external access to the encapsulated Impl, and permits no references to it to leak
+ *   - users must ensure no references to the sharedRef leak, or are retained outside of a method scope either.
+ *     (to ensure the sharedRef is collected with the object, so that leaks may be detected and corrected)
+ *
+ * This class' functionality is achieved by what may look at first glance like a complex web of references,
+ * but boils down to:
+ *
+ * Target --> Impl --> sharedRef --> [RefState] <--> RefCountedState --> Tidy
+ *                                        ^                ^
+ *                                        |                |
+ * Ref -----------------------------------                 |
+ *                                                         |
+ * Global -------------------------------------------------
+ *
+ * So that, if Target is collected, Impl is collected and, hence, so is sharedRef.
+ *
+ * Once ref or sharedRef are collected, the paired RefState's release method is called, which if it had
+ * not already been called will update RefCountedState and log an error.
+ *
+ * Once the RefCountedState has been completely released, the Tidy method is called and it removes the global reference
+ * to itself so it may also be collected.
+ */
+public interface RefCounted
+{
+
+    /**
+     * @return the a new Ref() to the managed object, incrementing its refcount, or null if it is already released
+     */
+    public Ref tryRef();
+
+    /**
+     * @return the shared Ref that is created at instantiation of the RefCounted instance.
+     * Once released, if no other refs are extant the object will be tidied; references to
+     * this object should never be retained outside of a method's scope
+     */
+    public Ref sharedRef();
+
+    public static interface Tidy
+    {
+        void tidy();
+        String name();
+    }
+
+    public static class Impl
+    {
+        public static RefCounted get(Tidy tidy)
+        {
+            return new RefCountedImpl(tidy);
+        }
+    }
+}