You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/28 16:20:27 UTC

[3/7] cassandra git commit: Safer Resource Management

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java b/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java
new file mode 100644
index 0000000..0de6f40
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java
@@ -0,0 +1,132 @@
+package org.apache.cassandra.utils.concurrent;
+
+import java.lang.ref.ReferenceQueue;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+// default implementation; can be hidden and proxied (like we do for SSTableReader)
+final class RefCountedImpl implements RefCounted
+{
+    private final Ref sharedRef;
+    private final GlobalState state;
+
+    public RefCountedImpl(Tidy tidy)
+    {
+        this.state = new GlobalState(tidy);
+        sharedRef = new Ref(this.state, true);
+        globallyExtant.add(this.state);
+    }
+
+    /**
+     * see {@link RefCounted#tryRef()}
+     */
+    public Ref tryRef()
+    {
+        return state.ref() ? new Ref(state, false) : null;
+    }
+
+    /**
+     * see {@link RefCounted#sharedRef()}
+     */
+    public Ref sharedRef()
+    {
+        return sharedRef;
+    }
+
+    // the object that manages the actual cleaning up; this does not reference the RefCounted.Impl
+    // so that we can detect when references are lost to the resource itself, and still cleanup afterwards
+    // the Tidy object MUST not contain any references to the object we are managing
+    static final class GlobalState
+    {
+        // we need to retain a reference to each of the PhantomReference instances
+        // we are using to track individual refs
+        private final ConcurrentLinkedQueue<Ref.State> locallyExtant = new ConcurrentLinkedQueue<>();
+        // the number of live refs
+        private final AtomicInteger counts = new AtomicInteger();
+        // the object to call to cleanup when our refs are all finished with
+        private final Tidy tidy;
+
+        GlobalState(Tidy tidy)
+        {
+            this.tidy = tidy;
+        }
+
+        void register(Ref.State ref)
+        {
+            locallyExtant.add(ref);
+        }
+
+        // increment ref count if not already tidied, and return success/failure
+        boolean ref()
+        {
+            while (true)
+            {
+                int cur = counts.get();
+                if (cur < 0)
+                    return false;
+                if (counts.compareAndSet(cur, cur + 1))
+                    return true;
+            }
+        }
+
+        // release a single reference, and cleanup if no more are extant
+        void release(Ref.State ref)
+        {
+            locallyExtant.remove(ref);
+            if (-1 == counts.decrementAndGet())
+            {
+                globallyExtant.remove(this);
+                tidy.tidy();
+            }
+        }
+
+        int count()
+        {
+            return 1 + counts.get();
+        }
+
+        public String toString()
+        {
+            return tidy.name();
+        }
+    }
+
+    private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<GlobalState, Boolean>());
+    static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
+    private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("Reference-Reaper"));
+    static
+    {
+        EXEC.execute(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    while (true)
+                    {
+                        Object obj = referenceQueue.remove();
+                        if (obj instanceof Ref.State)
+                        {
+                            ((Ref.State) obj).release(true);
+                        }
+                    }
+                }
+                catch (InterruptedException e)
+                {
+                }
+                finally
+                {
+                    EXEC.execute(this);
+                }
+            }
+        });
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
new file mode 100644
index 0000000..ed5fcfa
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -0,0 +1,219 @@
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.*;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterators;
+
+/**
+ * A collection of managed Ref references to RefCounted objects, and the objects they are referencing.
+ * Care MUST be taken when using this collection, as if a permanent reference to it leaks we will not
+ * be alerted to a lack of reference release.
+ *
+ * All of the java.util.Collection operations that modify the collection are unsupported.
+ */
+public final class Refs<T extends RefCounted> extends AbstractCollection<T> implements AutoCloseable
+{
+    private final Map<T, Ref> references;
+
+    public Refs()
+    {
+        this.references = new HashMap<>();
+    }
+
+    public Refs(Map<T, Ref> references)
+    {
+        this.references = new HashMap<>(references);
+    }
+
+    /**
+     * Release ALL of the references held by this Refs collection
+     */
+    public void release()
+    {
+        try
+        {
+            release(references.values());
+        }
+        finally
+        {
+            references.clear();
+        }
+    }
+
+    /**
+     * See {@link Refs#release()}
+     */
+    public void close()
+    {
+        release();
+    }
+
+    /**
+     * @param referenced the object we have a Ref to
+     * @return the Ref to said object
+     */
+    public Ref get(T referenced)
+    {
+        return references.get(referenced);
+    }
+
+    /**
+     * @param referenced the object we have a Ref to
+     */
+    public void release(T referenced)
+    {
+        Ref ref = references.remove(referenced);
+        if (ref == null)
+            throw new IllegalStateException("This Refs collection does not hold a reference to " + referenced);
+        ref.release();
+    }
+
+    /**
+     * Release the retained Ref to the provided object, if held, return false otherwise
+     * @param referenced the object we retain a Ref to
+     * @return return true if we held a reference to the object, and false otherwise
+     */
+    public boolean releaseIfHolds(T referenced)
+    {
+        Ref ref = references.remove(referenced);
+        if (ref != null)
+            ref.release();
+        return ref != null;
+    }
+
+    /**
+     * Release a retained Ref to all of the provided objects; if any is not held, an exception will be thrown
+     * @param release
+     */
+    public void release(Collection<T> release)
+    {
+        List<Ref> refs = new ArrayList<>();
+        List<T> notPresent = null;
+        for (T obj : release)
+        {
+            Ref ref = references.remove(obj);
+            if (ref == null)
+            {
+                if (notPresent == null)
+                    notPresent = new ArrayList<>();
+                notPresent.add(obj);
+            }
+            else
+            {
+                refs.add(ref);
+            }
+        }
+
+        IllegalStateException notPresentFail = null;
+        if (notPresent != null)
+        {
+            notPresentFail = new IllegalStateException("Could not release references to " + notPresent
+                                                       + " as references to these objects were not held");
+            notPresentFail.fillInStackTrace();
+        }
+        try
+        {
+            release(refs);
+        }
+        catch (Throwable t)
+        {
+            if (notPresentFail != null)
+                t.addSuppressed(notPresentFail);
+        }
+        if (notPresentFail != null)
+            throw notPresentFail;
+    }
+
+    /**
+     * Attempt to take a reference to the provided object; if it has already been released, null will be returned
+     * @param t object to acquire a reference to
+     * @return true iff success
+     */
+    public boolean tryRef(T t)
+    {
+        Ref ref = t.tryRef();
+        if (ref == null)
+            return false;
+        ref = references.put(t, ref);
+        if (ref != null)
+            ref.release(); // release dup
+        return true;
+    }
+
+    public Iterator<T> iterator()
+    {
+        return Iterators.unmodifiableIterator(references.keySet().iterator());
+    }
+
+    public int size()
+    {
+        return references.size();
+    }
+
+    /**
+     * Merge two sets of references, ensuring only one reference is retained between the two sets
+     */
+    public Refs<T> addAll(Refs<T> add)
+    {
+        List<Ref> overlap = new ArrayList<>();
+        for (Map.Entry<T, Ref> e : add.references.entrySet())
+        {
+            if (this.references.containsKey(e.getKey()))
+                overlap.add(e.getValue());
+            else
+                this.references.put(e.getKey(), e.getValue());
+        }
+        add.references.clear();
+        release(overlap);
+        return this;
+    }
+
+    /**
+     * Acquire a reference to all of the provided objects, or none
+     */
+    public static <T extends RefCounted> Refs<T> tryRef(Iterable<T> reference)
+    {
+        HashMap<T, Ref> refs = new HashMap<>();
+        for (T rc : reference)
+        {
+            Ref ref = rc.tryRef();
+            if (ref == null)
+            {
+                release(refs.values());
+                return null;
+            }
+            refs.put(rc, ref);
+        }
+        return new Refs<T>(refs);
+    }
+
+    public static <T extends RefCounted> Refs<T> ref(Iterable<T> reference)
+    {
+        Refs<T> refs = tryRef(reference);
+        if (refs != null)
+            return refs;
+        throw new IllegalStateException();
+    }
+
+    private static void release(Iterable<Ref> refs)
+    {
+        Throwable fail = null;
+        for (Ref ref : refs)
+        {
+            try
+            {
+                ref.release();
+            }
+            catch (Throwable t)
+            {
+                if (fail == null)
+                    fail = t;
+                else
+                    fail.addSuppressed(t);
+            }
+        }
+        if (fail != null)
+            throw Throwables.propagate(fail);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 4a1c104..ce65d5a 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
@@ -64,6 +65,15 @@ public class SchemaLoader
             MigrationManager.announceNewKeyspace(ksm);
     }
 
+    @After
+    public void leakDetect() throws InterruptedException
+    {
+        System.gc();
+        System.gc();
+        System.gc();
+        Thread.sleep(10);
+    }
+
     public static void prepareServer()
     {
         // Cleanup first

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 1bc7caf..1f7024e 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import org.apache.cassandra.utils.concurrent.Refs;
 import static org.junit.Assert.assertEquals;
 
 public class KeyCacheTest extends SchemaLoader
@@ -151,8 +152,9 @@ public class KeyCacheTest extends SchemaLoader
         assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
 
         Set<SSTableReader> readers = cfs.getDataTracker().getSSTables();
-        for (SSTableReader reader : readers)
-            reader.acquireReference();
+        Refs<SSTableReader> refs = Refs.tryRef(readers);
+        if (refs == null)
+            throw new IllegalStateException();
 
         Util.compactAll(cfs, Integer.MAX_VALUE).get();
         // after compaction cache should have entries for new SSTables,
@@ -160,8 +162,7 @@ public class KeyCacheTest extends SchemaLoader
         // if we had 2 keys in cache previously it should become 4
         assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
 
-        for (SSTableReader reader : readers)
-            reader.releaseReference();
+        refs.release();
 
         Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);;
         while (ScheduledExecutors.nonPeriodicTasks.getActiveCount() + ScheduledExecutors.nonPeriodicTasks.getQueue().size() > 0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index a09d8b4..d9442c7 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.compaction;
 
+import org.apache.cassandra.utils.concurrent.Refs;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -63,9 +64,11 @@ public class AntiCompactionTest extends SchemaLoader
         Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
         List<Range<Token>> ranges = Arrays.asList(range);
 
-        SSTableReader.acquireReferences(sstables);
+        Refs<SSTableReader> refs = Refs.tryRef(sstables);
+        if (refs == null)
+            throw new IllegalStateException();
         long repairedAt = 1000;
-        CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
+        CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt);
 
         assertEquals(2, store.getSSTables().size());
         int repairedKeys = 0;
@@ -93,7 +96,7 @@ public class AntiCompactionTest extends SchemaLoader
         for (SSTableReader sstable : store.getSSTables())
         {
             assertFalse(sstable.isMarkedCompacted());
-            assertEquals(1, sstable.referenceCount());
+            assertEquals(1, sstable.sharedRef().globalCount());
         }
         assertEquals(0, store.getDataTracker().getCompacting().size());
         assertEquals(repairedKeys, 4);
@@ -110,8 +113,7 @@ public class AntiCompactionTest extends SchemaLoader
         long origSize = s.bytesOnDisk();
         Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        SSTableReader.acquireReferences(sstables);
-        CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), sstables, 12345);
+        CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), Refs.tryRef(sstables), 12345);
         long sum = 0;
         for (SSTableReader x : cfs.getSSTables())
             sum += x.bytesOnDisk();
@@ -149,12 +151,13 @@ public class AntiCompactionTest extends SchemaLoader
         Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
         List<Range<Token>> ranges = Arrays.asList(range);
 
-        SSTableReader.acquireReferences(sstables);
-        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
-
+        Refs<SSTableReader> refs = Refs.tryRef(sstables);
+        if (refs == null)
+            throw new IllegalStateException();
+        CompactionManager.instance.performAnticompaction(store, ranges, refs, 1);
         assertThat(store.getSSTables().size(), is(1));
         assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
-        assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+        assertThat(Iterables.get(store.getSSTables(), 0).sharedRef().globalCount(), is(1));
         assertThat(store.getDataTracker().getCompacting().size(), is(0));
     }
 
@@ -167,12 +170,11 @@ public class AntiCompactionTest extends SchemaLoader
         Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
         List<Range<Token>> ranges = Arrays.asList(range);
 
-        SSTableReader.acquireReferences(sstables);
-        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
+        CompactionManager.instance.performAnticompaction(store, ranges, Refs.tryRef(sstables), 1);
 
         assertThat(store.getSSTables().size(), is(1));
         assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
-        assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+        assertThat(Iterables.get(store.getSSTables(), 0).sharedRef().globalCount(), is(1));
         assertThat(store.getDataTracker().getCompacting().size(), is(0));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
index e6626ea..08e3fb3 100644
--- a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
@@ -26,6 +26,8 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -44,6 +46,15 @@ public class BlacklistingCompactionsTest extends SchemaLoader
 {
     public static final String KEYSPACE = "Keyspace1";
 
+    @After
+    public void leakDetect() throws InterruptedException
+    {
+        System.gc();
+        System.gc();
+        System.gc();
+        Thread.sleep(10);
+    }
+
     @BeforeClass
     public static void closeStdErr()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 5341a4b..8bef669 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -118,7 +118,7 @@ public class LegacySSTableTest extends SchemaLoader
         ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
         ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken()));
         ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
-        details.add(new StreamSession.SSTableStreamingSections(sstable,
+        details.add(new StreamSession.SSTableStreamingSections(sstable, sstable.tryRef(),
                                                                sstable.getPositionsForRanges(ranges),
                                                                sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
         new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index fbd627b..acf8c90 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -200,7 +200,7 @@ public class SSTableRewriterTest extends SchemaLoader
         assertTrue(s != s2);
         assertFileCounts(dir.list(), 2, 3);
         s.markObsolete();
-        s.releaseReference();
+        s.sharedRef().release();
         Thread.sleep(1000);
         assertFileCounts(dir.list(), 0, 3);
         writer.abort(false);
@@ -705,7 +705,7 @@ public class SSTableRewriterTest extends SchemaLoader
         for (SSTableReader sstable : cfs.getSSTables())
         {
             assertFalse(sstable.isMarkedCompacted());
-            assertEquals(1, sstable.referenceCount());
+            assertEquals(1, sstable.sharedRef().globalCount());
             liveDescriptors.add(sstable.descriptor.generation);
         }
         for (File dir : cfs.directories.getCFDirectories())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index a528f10..f84ae11 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -64,7 +64,7 @@ public class StreamTransferTaskTest extends SchemaLoader
         {
             List<Range<Token>> ranges = new ArrayList<>();
             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
-            task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges), 0);
+            task.addTransferFile(sstable, sstable.sharedRef(), 1, sstable.getPositionsForRanges(ranges), 0);
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 5d44210..a5112b7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.FBUtilities;
 
+import org.apache.cassandra.utils.concurrent.Refs;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.apache.cassandra.Util.cellname;
@@ -203,15 +204,15 @@ public class StreamingTransferTest extends SchemaLoader
 
     private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
     {
-        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Arrays.asList(sstable))).execute().get();
+        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get();
     }
 
-    private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Collection<SSTableReader> sstables)
+    private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Refs<SSTableReader> sstables)
     {
         ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
         for (SSTableReader sstable : sstables)
         {
-            details.add(new StreamSession.SSTableStreamingSections(sstable,
+            details.add(new StreamSession.SSTableStreamingSections(sstable, sstables.get(sstable),
                                                                    sstable.getPositionsForRanges(ranges),
                                                                    sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
         }
@@ -375,9 +376,9 @@ public class StreamingTransferTest extends SchemaLoader
         ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("test"))));
         ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("transfer2")), p.getMinimumToken()));
         // Acquiring references, transferSSTables needs it
-        sstable.acquireReference();
-        sstable2.acquireReference();
-        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Arrays.asList(sstable, sstable2))).execute().get();
+        Refs<SSTableReader> refs = Refs.tryRef(Arrays.asList(sstable, sstable2));
+        assert refs != null;
+        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get();
 
         // confirm that the sstables were transferred and registered and that 2 keys arrived
         ColumnFamilyStore cfstore = Keyspace.open(keyspaceName).getColumnFamilyStore(cfname);
@@ -428,10 +429,11 @@ public class StreamingTransferTest extends SchemaLoader
         ranges.add(new Range<>(secondtolast.getKey().getToken(), p.getMinimumToken()));
 
         // Acquiring references, transferSSTables needs it
-        if (!SSTableReader.acquireReferences(ssTableReaders))
+        Refs<SSTableReader> refs = Refs.tryRef(ssTableReaders);
+        if (refs == null)
             throw new AssertionError();
 
-        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, ssTableReaders)).execute().get();
+        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get();
 
         // check that only two keys were transferred
         for (Map.Entry<DecoratedKey,String> entry : Arrays.asList(first, last))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java b/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java
new file mode 100644
index 0000000..fe22d21
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java
@@ -0,0 +1,85 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class RefCountedTest
+{
+
+    private static final class Tidier implements RefCounted.Tidy
+    {
+        boolean tidied;
+
+        public void tidy()
+        {
+            tidied = true;
+        }
+
+        public String name()
+        {
+            return "test tidy";
+        }
+    }
+
+    @Test
+    public void testLeak() throws InterruptedException
+    {
+        Tidier tidier = new Tidier();
+        RefCounted obj = RefCounted.Impl.get(tidier);
+        obj.tryRef();
+        obj.sharedRef().release();
+        System.gc();
+        System.gc();
+        Thread.sleep(1000);
+        Assert.assertTrue(tidier.tidied);
+    }
+
+    @Test
+    public void testSeriousLeak() throws InterruptedException
+    {
+        Tidier tidier = new Tidier();
+        RefCounted.Impl.get(tidier);
+        System.gc();
+        System.gc();
+        System.gc();
+        System.gc();
+        Thread.sleep(1000);
+        Assert.assertTrue(tidier.tidied);
+    }
+
+    @Test
+    public void testDoubleRelease() throws InterruptedException
+    {
+        Tidier tidier = null;
+        try
+        {
+            tidier = new Tidier();
+            RefCounted obj = RefCounted.Impl.get(tidier);
+            obj.sharedRef().release();
+            obj.sharedRef().release();
+            Assert.assertTrue(false);
+        }
+        catch (Exception e)
+        {
+        }
+    }
+}