You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/28 16:20:27 UTC
[3/7] cassandra git commit: Safer Resource Management
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java b/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java
new file mode 100644
index 0000000..0de6f40
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java
@@ -0,0 +1,132 @@
+package org.apache.cassandra.utils.concurrent;
+
+import java.lang.ref.ReferenceQueue;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+// default implementation; can be hidden and proxied (like we do for SSTableReader)
+final class RefCountedImpl implements RefCounted
+{
+ private final Ref sharedRef;
+ private final GlobalState state;
+
+ public RefCountedImpl(Tidy tidy)
+ {
+ this.state = new GlobalState(tidy);
+ sharedRef = new Ref(this.state, true);
+ globallyExtant.add(this.state);
+ }
+
+ /**
+ * see {@link RefCounted#tryRef()}
+ */
+ public Ref tryRef()
+ {
+ return state.ref() ? new Ref(state, false) : null;
+ }
+
+ /**
+ * see {@link RefCounted#sharedRef()}
+ */
+ public Ref sharedRef()
+ {
+ return sharedRef;
+ }
+
+ // the object that manages the actual cleaning up; this does not reference the RefCounted.Impl
+ // so that we can detect when references are lost to the resource itself, and still cleanup afterwards
+ // the Tidy object MUST not contain any references to the object we are managing
+ static final class GlobalState
+ {
+ // we need to retain a reference to each of the PhantomReference instances
+ // we are using to track individual refs
+ private final ConcurrentLinkedQueue<Ref.State> locallyExtant = new ConcurrentLinkedQueue<>();
+ // the number of live refs
+ private final AtomicInteger counts = new AtomicInteger();
+ // the object to call to cleanup when our refs are all finished with
+ private final Tidy tidy;
+
+ GlobalState(Tidy tidy)
+ {
+ this.tidy = tidy;
+ }
+
+ void register(Ref.State ref)
+ {
+ locallyExtant.add(ref);
+ }
+
+ // increment ref count if not already tidied, and return success/failure
+ boolean ref()
+ {
+ while (true)
+ {
+ int cur = counts.get();
+ if (cur < 0)
+ return false;
+ if (counts.compareAndSet(cur, cur + 1))
+ return true;
+ }
+ }
+
+ // release a single reference, and cleanup if no more are extant
+ void release(Ref.State ref)
+ {
+ locallyExtant.remove(ref);
+ if (-1 == counts.decrementAndGet())
+ {
+ globallyExtant.remove(this);
+ tidy.tidy();
+ }
+ }
+
+ int count()
+ {
+ return 1 + counts.get();
+ }
+
+ public String toString()
+ {
+ return tidy.name();
+ }
+ }
+
+ private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<GlobalState, Boolean>());
+ static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
+ private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("Reference-Reaper"));
+ static
+ {
+ EXEC.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ while (true)
+ {
+ Object obj = referenceQueue.remove();
+ if (obj instanceof Ref.State)
+ {
+ ((Ref.State) obj).release(true);
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ finally
+ {
+ EXEC.execute(this);
+ }
+ }
+ });
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
new file mode 100644
index 0000000..ed5fcfa
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -0,0 +1,219 @@
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.*;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterators;
+
+/**
+ * A collection of managed Ref references to RefCounted objects, and the objects they are referencing.
+ * Care MUST be taken when using this collection, as if a permanent reference to it leaks we will not
+ * be alerted to a lack of reference release.
+ *
+ * All of the java.util.Collection operations that modify the collection are unsupported.
+ */
+public final class Refs<T extends RefCounted> extends AbstractCollection<T> implements AutoCloseable
+{
+ private final Map<T, Ref> references;
+
+ public Refs()
+ {
+ this.references = new HashMap<>();
+ }
+
+ public Refs(Map<T, Ref> references)
+ {
+ this.references = new HashMap<>(references);
+ }
+
+ /**
+ * Release ALL of the references held by this Refs collection
+ */
+ public void release()
+ {
+ try
+ {
+ release(references.values());
+ }
+ finally
+ {
+ references.clear();
+ }
+ }
+
+ /**
+ * See {@link Refs#release()}
+ */
+ public void close()
+ {
+ release();
+ }
+
+ /**
+ * @param referenced the object we have a Ref to
+ * @return the Ref to said object
+ */
+ public Ref get(T referenced)
+ {
+ return references.get(referenced);
+ }
+
+ /**
+ * @param referenced the object we have a Ref to
+ */
+ public void release(T referenced)
+ {
+ Ref ref = references.remove(referenced);
+ if (ref == null)
+ throw new IllegalStateException("This Refs collection does not hold a reference to " + referenced);
+ ref.release();
+ }
+
+ /**
+ * Release the retained Ref to the provided object, if held, return false otherwise
+ * @param referenced the object we retain a Ref to
+ * @return return true if we held a reference to the object, and false otherwise
+ */
+ public boolean releaseIfHolds(T referenced)
+ {
+ Ref ref = references.remove(referenced);
+ if (ref != null)
+ ref.release();
+ return ref != null;
+ }
+
+ /**
+ * Release a retained Ref to all of the provided objects; if any is not held, an exception will be thrown
+ * @param release
+ */
+ public void release(Collection<T> release)
+ {
+ List<Ref> refs = new ArrayList<>();
+ List<T> notPresent = null;
+ for (T obj : release)
+ {
+ Ref ref = references.remove(obj);
+ if (ref == null)
+ {
+ if (notPresent == null)
+ notPresent = new ArrayList<>();
+ notPresent.add(obj);
+ }
+ else
+ {
+ refs.add(ref);
+ }
+ }
+
+ IllegalStateException notPresentFail = null;
+ if (notPresent != null)
+ {
+ notPresentFail = new IllegalStateException("Could not release references to " + notPresent
+ + " as references to these objects were not held");
+ notPresentFail.fillInStackTrace();
+ }
+ try
+ {
+ release(refs);
+ }
+ catch (Throwable t)
+ {
+ if (notPresentFail != null)
+ t.addSuppressed(notPresentFail);
+ }
+ if (notPresentFail != null)
+ throw notPresentFail;
+ }
+
+ /**
+ * Attempt to take a reference to the provided object; if it has already been released, null will be returned
+ * @param t object to acquire a reference to
+ * @return true iff success
+ */
+ public boolean tryRef(T t)
+ {
+ Ref ref = t.tryRef();
+ if (ref == null)
+ return false;
+ ref = references.put(t, ref);
+ if (ref != null)
+ ref.release(); // release dup
+ return true;
+ }
+
+ public Iterator<T> iterator()
+ {
+ return Iterators.unmodifiableIterator(references.keySet().iterator());
+ }
+
+ public int size()
+ {
+ return references.size();
+ }
+
+ /**
+ * Merge two sets of references, ensuring only one reference is retained between the two sets
+ */
+ public Refs<T> addAll(Refs<T> add)
+ {
+ List<Ref> overlap = new ArrayList<>();
+ for (Map.Entry<T, Ref> e : add.references.entrySet())
+ {
+ if (this.references.containsKey(e.getKey()))
+ overlap.add(e.getValue());
+ else
+ this.references.put(e.getKey(), e.getValue());
+ }
+ add.references.clear();
+ release(overlap);
+ return this;
+ }
+
+ /**
+ * Acquire a reference to all of the provided objects, or none
+ */
+ public static <T extends RefCounted> Refs<T> tryRef(Iterable<T> reference)
+ {
+ HashMap<T, Ref> refs = new HashMap<>();
+ for (T rc : reference)
+ {
+ Ref ref = rc.tryRef();
+ if (ref == null)
+ {
+ release(refs.values());
+ return null;
+ }
+ refs.put(rc, ref);
+ }
+ return new Refs<T>(refs);
+ }
+
+ public static <T extends RefCounted> Refs<T> ref(Iterable<T> reference)
+ {
+ Refs<T> refs = tryRef(reference);
+ if (refs != null)
+ return refs;
+ throw new IllegalStateException();
+ }
+
+ private static void release(Iterable<Ref> refs)
+ {
+ Throwable fail = null;
+ for (Ref ref : refs)
+ {
+ try
+ {
+ ref.release();
+ }
+ catch (Throwable t)
+ {
+ if (fail == null)
+ fail = t;
+ else
+ fail.addSuppressed(t);
+ }
+ }
+ if (fail != null)
+ throw Throwables.propagate(fail);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 4a1c104..ce65d5a 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.*;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
@@ -64,6 +65,15 @@ public class SchemaLoader
MigrationManager.announceNewKeyspace(ksm);
}
+ @After
+ public void leakDetect() throws InterruptedException
+ {
+ System.gc();
+ System.gc();
+ System.gc();
+ Thread.sleep(10);
+ }
+
public static void prepareServer()
{
// Cleanup first
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 1bc7caf..1f7024e 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.Refs;
import static org.junit.Assert.assertEquals;
public class KeyCacheTest extends SchemaLoader
@@ -151,8 +152,9 @@ public class KeyCacheTest extends SchemaLoader
assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
Set<SSTableReader> readers = cfs.getDataTracker().getSSTables();
- for (SSTableReader reader : readers)
- reader.acquireReference();
+ Refs<SSTableReader> refs = Refs.tryRef(readers);
+ if (refs == null)
+ throw new IllegalStateException();
Util.compactAll(cfs, Integer.MAX_VALUE).get();
// after compaction cache should have entries for new SSTables,
@@ -160,8 +162,7 @@ public class KeyCacheTest extends SchemaLoader
// if we had 2 keys in cache previously it should become 4
assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
- for (SSTableReader reader : readers)
- reader.releaseReference();
+ refs.release();
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);;
while (ScheduledExecutors.nonPeriodicTasks.getActiveCount() + ScheduledExecutors.nonPeriodicTasks.getQueue().size() > 0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index a09d8b4..d9442c7 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.compaction;
+import org.apache.cassandra.utils.concurrent.Refs;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -63,9 +64,11 @@ public class AntiCompactionTest extends SchemaLoader
Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
- SSTableReader.acquireReferences(sstables);
+ Refs<SSTableReader> refs = Refs.tryRef(sstables);
+ if (refs == null)
+ throw new IllegalStateException();
long repairedAt = 1000;
- CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt);
assertEquals(2, store.getSSTables().size());
int repairedKeys = 0;
@@ -93,7 +96,7 @@ public class AntiCompactionTest extends SchemaLoader
for (SSTableReader sstable : store.getSSTables())
{
assertFalse(sstable.isMarkedCompacted());
- assertEquals(1, sstable.referenceCount());
+ assertEquals(1, sstable.sharedRef().globalCount());
}
assertEquals(0, store.getDataTracker().getCompacting().size());
assertEquals(repairedKeys, 4);
@@ -110,8 +113,7 @@ public class AntiCompactionTest extends SchemaLoader
long origSize = s.bytesOnDisk();
Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
Collection<SSTableReader> sstables = cfs.getSSTables();
- SSTableReader.acquireReferences(sstables);
- CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), sstables, 12345);
+ CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), Refs.tryRef(sstables), 12345);
long sum = 0;
for (SSTableReader x : cfs.getSSTables())
sum += x.bytesOnDisk();
@@ -149,12 +151,13 @@ public class AntiCompactionTest extends SchemaLoader
Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
- SSTableReader.acquireReferences(sstables);
- CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
-
+ Refs<SSTableReader> refs = Refs.tryRef(sstables);
+ if (refs == null)
+ throw new IllegalStateException();
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, 1);
assertThat(store.getSSTables().size(), is(1));
assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
- assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+ assertThat(Iterables.get(store.getSSTables(), 0).sharedRef().globalCount(), is(1));
assertThat(store.getDataTracker().getCompacting().size(), is(0));
}
@@ -167,12 +170,11 @@ public class AntiCompactionTest extends SchemaLoader
Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
- SSTableReader.acquireReferences(sstables);
- CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
+ CompactionManager.instance.performAnticompaction(store, ranges, Refs.tryRef(sstables), 1);
assertThat(store.getSSTables().size(), is(1));
assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
- assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+ assertThat(Iterables.get(store.getSSTables(), 0).sharedRef().globalCount(), is(1));
assertThat(store.getDataTracker().getCompacting().size(), is(0));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
index e6626ea..08e3fb3 100644
--- a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
@@ -26,6 +26,8 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import org.junit.After;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -44,6 +46,15 @@ public class BlacklistingCompactionsTest extends SchemaLoader
{
public static final String KEYSPACE = "Keyspace1";
+ @After
+ public void leakDetect() throws InterruptedException
+ {
+ System.gc();
+ System.gc();
+ System.gc();
+ Thread.sleep(10);
+ }
+
@BeforeClass
public static void closeStdErr()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 5341a4b..8bef669 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -118,7 +118,7 @@ public class LegacySSTableTest extends SchemaLoader
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken()));
ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
- details.add(new StreamSession.SSTableStreamingSections(sstable,
+ details.add(new StreamSession.SSTableStreamingSections(sstable, sstable.tryRef(),
sstable.getPositionsForRanges(ranges),
sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index fbd627b..acf8c90 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -200,7 +200,7 @@ public class SSTableRewriterTest extends SchemaLoader
assertTrue(s != s2);
assertFileCounts(dir.list(), 2, 3);
s.markObsolete();
- s.releaseReference();
+ s.sharedRef().release();
Thread.sleep(1000);
assertFileCounts(dir.list(), 0, 3);
writer.abort(false);
@@ -705,7 +705,7 @@ public class SSTableRewriterTest extends SchemaLoader
for (SSTableReader sstable : cfs.getSSTables())
{
assertFalse(sstable.isMarkedCompacted());
- assertEquals(1, sstable.referenceCount());
+ assertEquals(1, sstable.sharedRef().globalCount());
liveDescriptors.add(sstable.descriptor.generation);
}
for (File dir : cfs.directories.getCFDirectories())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index a528f10..f84ae11 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -64,7 +64,7 @@ public class StreamTransferTaskTest extends SchemaLoader
{
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
- task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges), 0);
+ task.addTransferFile(sstable, sstable.sharedRef(), 1, sstable.getPositionsForRanges(ranges), 0);
}
assertEquals(2, task.getTotalNumberOfFiles());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 5d44210..a5112b7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Refs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.apache.cassandra.Util.cellname;
@@ -203,15 +204,15 @@ public class StreamingTransferTest extends SchemaLoader
private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
{
- new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Arrays.asList(sstable))).execute().get();
+ new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get();
}
- private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Collection<SSTableReader> sstables)
+ private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Refs<SSTableReader> sstables)
{
ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
for (SSTableReader sstable : sstables)
{
- details.add(new StreamSession.SSTableStreamingSections(sstable,
+ details.add(new StreamSession.SSTableStreamingSections(sstable, sstables.get(sstable),
sstable.getPositionsForRanges(ranges),
sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
}
@@ -375,9 +376,9 @@ public class StreamingTransferTest extends SchemaLoader
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("test"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("transfer2")), p.getMinimumToken()));
// Acquiring references, transferSSTables needs it
- sstable.acquireReference();
- sstable2.acquireReference();
- new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Arrays.asList(sstable, sstable2))).execute().get();
+ Refs<SSTableReader> refs = Refs.tryRef(Arrays.asList(sstable, sstable2));
+ assert refs != null;
+ new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get();
// confirm that the sstables were transferred and registered and that 2 keys arrived
ColumnFamilyStore cfstore = Keyspace.open(keyspaceName).getColumnFamilyStore(cfname);
@@ -428,10 +429,11 @@ public class StreamingTransferTest extends SchemaLoader
ranges.add(new Range<>(secondtolast.getKey().getToken(), p.getMinimumToken()));
// Acquiring references, transferSSTables needs it
- if (!SSTableReader.acquireReferences(ssTableReaders))
+ Refs<SSTableReader> refs = Refs.tryRef(ssTableReaders);
+ if (refs == null)
throw new AssertionError();
- new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, ssTableReaders)).execute().get();
+ new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get();
// check that only two keys were transferred
for (Map.Entry<DecoratedKey,String> entry : Arrays.asList(first, last))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java b/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java
new file mode 100644
index 0000000..fe22d21
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java
@@ -0,0 +1,85 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class RefCountedTest
+{
+
+ private static final class Tidier implements RefCounted.Tidy
+ {
+ boolean tidied;
+
+ public void tidy()
+ {
+ tidied = true;
+ }
+
+ public String name()
+ {
+ return "test tidy";
+ }
+ }
+
+ @Test
+ public void testLeak() throws InterruptedException
+ {
+ Tidier tidier = new Tidier();
+ RefCounted obj = RefCounted.Impl.get(tidier);
+ obj.tryRef();
+ obj.sharedRef().release();
+ System.gc();
+ System.gc();
+ Thread.sleep(1000);
+ Assert.assertTrue(tidier.tidied);
+ }
+
+ @Test
+ public void testSeriousLeak() throws InterruptedException
+ {
+ Tidier tidier = new Tidier();
+ RefCounted.Impl.get(tidier);
+ System.gc();
+ System.gc();
+ System.gc();
+ System.gc();
+ Thread.sleep(1000);
+ Assert.assertTrue(tidier.tidied);
+ }
+
+ @Test
+ public void testDoubleRelease() throws InterruptedException
+ {
+ Tidier tidier = null;
+ try
+ {
+ tidier = new Tidier();
+ RefCounted obj = RefCounted.Impl.get(tidier);
+ obj.sharedRef().release();
+ obj.sharedRef().release();
+ Assert.assertTrue(false);
+ }
+ catch (Exception e)
+ {
+ }
+ }
+}