You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/28 16:20:28 UTC
[4/7] cassandra git commit: Safer Resource Management
Safer Resource Management
patch by benedict; review by marcuse for CASSANDRA-7705
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c75ee416
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c75ee416
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c75ee416
Branch: refs/heads/trunk
Commit: c75ee4160cb8fcdf47c90bfce8bf0d861f32d268
Parents: 9efa017
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 28 14:45:31 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 28 14:46:16 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 143 +++---
.../org/apache/cassandra/db/DataTracker.java | 6 +-
.../compaction/AbstractCompactionStrategy.java | 2 +-
.../db/compaction/CompactionController.java | 20 +-
.../db/compaction/CompactionManager.java | 26 +-
.../cassandra/db/compaction/CompactionTask.java | 9 +-
.../cassandra/db/index/SecondaryIndex.java | 10 +-
.../apache/cassandra/io/sstable/SSTable.java | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 14 +-
.../cassandra/io/sstable/SSTableReader.java | 444 +++++++++----------
.../cassandra/io/sstable/SSTableRewriter.java | 15 +-
.../cassandra/io/sstable/SSTableWriter.java | 4 +-
.../cassandra/service/ActiveRepairService.java | 18 +-
.../cassandra/streaming/StreamReceiveTask.java | 10 +-
.../cassandra/streaming/StreamSession.java | 28 +-
.../cassandra/streaming/StreamTransferTask.java | 10 +-
.../streaming/messages/OutgoingFileMessage.java | 10 +-
.../cassandra/tools/StandaloneScrubber.java | 2 +-
.../apache/cassandra/utils/concurrent/Ref.java | 134 ++++++
.../cassandra/utils/concurrent/RefCounted.java | 94 ++++
.../utils/concurrent/RefCountedImpl.java | 132 ++++++
.../apache/cassandra/utils/concurrent/Refs.java | 219 +++++++++
.../unit/org/apache/cassandra/SchemaLoader.java | 10 +
.../org/apache/cassandra/db/KeyCacheTest.java | 9 +-
.../db/compaction/AntiCompactionTest.java | 26 +-
.../compaction/BlacklistingCompactionsTest.java | 11 +
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +-
.../io/sstable/SSTableRewriterTest.java | 4 +-
.../streaming/StreamTransferTaskTest.java | 2 +-
.../streaming/StreamingTransferTest.java | 18 +-
.../utils/concurrent/RefCountedTest.java | 85 ++++
33 files changed, 1097 insertions(+), 423 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff6a26f..d142a68 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Safer Resource Management (CASSANDRA-7705)
* Make sure we compact highly overlapping cold sstables with
STCS (CASSANDRA-8635)
* rpc_interface and listen_interface generate NPE on startup when specified interface doesn't exist (CASSANDRA-8677)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 2e5d0ac..d53a0f7 100644
--- a/build.xml
+++ b/build.xml
@@ -1109,6 +1109,7 @@
<jvmarg value="-Djava.awt.headless=true"/>
<jvmarg value="-javaagent:${basedir}/lib/jamm-0.3.0.jar" />
<jvmarg value="-ea"/>
+ <jvmarg value="-Dcassandra.debugrefcount=true"/>
<jvmarg value="-Xss256k"/>
<jvmarg value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
<jvmarg value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 3822648..62aadf9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,6 +33,7 @@ import com.google.common.base.*;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
+import org.apache.cassandra.io.FSWriteError;
import org.json.simple.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,6 @@ import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -72,8 +72,8 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamLockfile;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.*;
import org.apache.cassandra.utils.TopKSampler.SamplerResult;
-import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import com.clearspring.analytics.stream.Counter;
@@ -767,16 +767,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
logger.info("Loading new SSTables and building secondary indexes for {}/{}: {}", keyspace.getName(), name, newSSTables);
- SSTableReader.acquireReferences(newSSTables);
- data.addSSTables(newSSTables);
- try
+
+ try (Refs<SSTableReader> refs = Refs.ref(newSSTables))
{
+ data.addSSTables(newSSTables);
indexManager.maybeBuildSecondaryIndexes(newSSTables, indexManager.allIndexesNames());
}
- finally
- {
- SSTableReader.releaseReferences(newSSTables);
- }
logger.info("Done loading load new SSTables for {}/{}", keyspace.getName(), name);
}
@@ -788,18 +784,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames));
Collection<SSTableReader> sstables = cfs.getSSTables();
- try
+
+ try (Refs<SSTableReader> refs = Refs.ref(sstables))
{
cfs.indexManager.setIndexRemoved(indexes);
- SSTableReader.acquireReferences(sstables);
logger.info(String.format("User Requested secondary index re-build for %s/%s indexes", ksName, cfName));
cfs.indexManager.maybeBuildSecondaryIndexes(sstables, indexes);
cfs.indexManager.setIndexBuilt(indexes);
}
- finally
- {
- SSTableReader.releaseReferences(sstables);
- }
}
public String getColumnFamilyName()
@@ -1285,13 +1277,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* @return sstables whose key range overlaps with that of the given sstables, not including itself.
* (The given sstables may or may not overlap with each other.)
*/
- public Set<SSTableReader> getOverlappingSSTables(Collection<SSTableReader> sstables)
+ public Collection<SSTableReader> getOverlappingSSTables(Iterable<SSTableReader> sstables)
{
logger.debug("Checking for sstables overlapping {}", sstables);
// a normal compaction won't ever have an empty sstables list, but we create a skeleton
// compaction controller for streaming, and that passes an empty list.
- if (sstables.isEmpty())
+ if (!sstables.iterator().hasNext())
return ImmutableSet.of();
DataTracker.SSTableIntervalTree tree = data.getView().intervalTree;
@@ -1310,13 +1302,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
/**
* like getOverlappingSSTables, but acquires references before returning
*/
- public Set<SSTableReader> getAndReferenceOverlappingSSTables(Collection<SSTableReader> sstables)
+ public Refs<SSTableReader> getAndReferenceOverlappingSSTables(Iterable<SSTableReader> sstables)
{
while (true)
{
- Set<SSTableReader> overlapped = getOverlappingSSTables(sstables);
- if (SSTableReader.acquireReferences(overlapped))
- return overlapped;
+ Iterable<SSTableReader> overlapped = getOverlappingSSTables(sstables);
+ Refs<SSTableReader> refs = Refs.tryRef(overlapped);
+ if (refs != null)
+ return refs;
}
}
@@ -1793,38 +1786,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return removeDeletedCF(cf, gcBefore);
}
- /**
- * Get the current view and acquires references on all its sstables.
- * This is a bit tricky because we must ensure that between the time we
- * get the current view and the time we acquire the references the set of
- * sstables hasn't changed. Otherwise we could get a view for which an
- * sstable have been deleted in the meantime.
- *
- * At the end of this method, a reference on all the sstables of the
- * returned view will have been acquired and must thus be released when
- * appropriate.
- */
- private DataTracker.View markCurrentViewReferenced()
- {
- while (true)
- {
- DataTracker.View currentView = data.getView();
- if (SSTableReader.acquireReferences(currentView.sstables))
- return currentView;
- }
- }
-
- /**
- * Get the current sstables, acquiring references on all of them.
- * The caller is in charge of releasing the references on the sstables.
- *
- * See markCurrentViewReferenced() above.
- */
- public Collection<SSTableReader> markCurrentSSTablesReferenced()
- {
- return markCurrentViewReferenced().sstables;
- }
-
public Set<SSTableReader> getUnrepairedSSTables()
{
Set<SSTableReader> unRepairedSSTables = new HashSet<>(getSSTables());
@@ -1851,13 +1812,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return repairedSSTables;
}
- public ViewFragment selectAndReference(Function<DataTracker.View, List<SSTableReader>> filter)
+ public RefViewFragment selectAndReference(Function<DataTracker.View, List<SSTableReader>> filter)
{
while (true)
{
ViewFragment view = select(filter);
- if (view.sstables.isEmpty() || SSTableReader.acquireReferences(view.sstables))
- return view;
+ Refs<SSTableReader> refs = Refs.tryRef(view.sstables);
+ if (refs != null)
+ return new RefViewFragment(view.sstables, view.memtables, refs);
}
}
@@ -2253,9 +2215,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
for (ColumnFamilyStore cfs : concatWithIndexes())
{
- DataTracker.View currentView = cfs.markCurrentViewReferenced();
final JSONArray filesJSONArr = new JSONArray();
- try
+ try (RefViewFragment currentView = cfs.selectAndReference(ALL_SSTABLES))
{
for (SSTableReader ssTable : currentView.sstables)
{
@@ -2273,10 +2234,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
writeSnapshotManifest(filesJSONArr, snapshotName);
}
- finally
- {
- SSTableReader.releaseReferences(currentView.sstables);
- }
}
}
@@ -2300,13 +2257,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- public List<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException
+ public Refs<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException
{
Map<Integer, SSTableReader> active = new HashMap<>();
for (SSTableReader sstable : data.getView().sstables)
active.put(sstable.descriptor.generation, sstable);
Map<Descriptor, Set<Component>> snapshots = directories.sstableLister().snapshots(tag).list();
- List<SSTableReader> readers = new ArrayList<>(snapshots.size());
+ Refs<SSTableReader> refs = new Refs<>();
try
{
for (Map.Entry<Descriptor, Set<Component>> entries : snapshots.entrySet())
@@ -2314,29 +2271,28 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// Try acquire reference to an active sstable instead of snapshot if it exists,
// to avoid opening new sstables. If it fails, use the snapshot reference instead.
SSTableReader sstable = active.get(entries.getKey().generation);
- if (sstable == null || !sstable.acquireReference())
+ if (sstable == null || !refs.tryRef(sstable))
{
if (logger.isDebugEnabled())
logger.debug("using snapshot sstable {}", entries.getKey());
sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner);
// This is technically not necessary since it's a snapshot but makes things easier
- sstable.acquireReference();
+ refs.tryRef(sstable);
}
else if (logger.isDebugEnabled())
{
logger.debug("using active sstable {}", entries.getKey());
}
- readers.add(sstable);
}
}
catch (IOException | RuntimeException e)
{
// In case one of the snapshot sstables fails to open,
// we must release the references to the ones we opened so far
- SSTableReader.releaseReferences(readers);
+ refs.release();
throw e;
}
- return readers;
+ return refs;
}
/**
@@ -2479,37 +2435,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public Iterable<DecoratedKey> keySamples(Range<Token> range)
{
- Collection<SSTableReader> sstables = markCurrentSSTablesReferenced();
- try
+ try (RefViewFragment view = selectAndReference(ALL_SSTABLES))
{
- Iterable<DecoratedKey>[] samples = new Iterable[sstables.size()];
+ Iterable<DecoratedKey>[] samples = new Iterable[view.sstables.size()];
int i = 0;
- for (SSTableReader sstable: sstables)
+ for (SSTableReader sstable: view.sstables)
{
samples[i++] = sstable.getKeySamples(range);
}
return Iterables.concat(samples);
}
- finally
- {
- SSTableReader.releaseReferences(sstables);
- }
}
public long estimatedKeysForRange(Range<Token> range)
{
- Collection<SSTableReader> sstables = markCurrentSSTablesReferenced();
- try
+ try (RefViewFragment view = selectAndReference(ALL_SSTABLES))
{
long count = 0;
- for (SSTableReader sstable : sstables)
+ for (SSTableReader sstable : view.sstables)
count += sstable.estimatedKeysForRanges(Collections.singleton(range));
return count;
}
- finally
- {
- SSTableReader.releaseReferences(sstables);
- }
}
/**
@@ -2888,6 +2834,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
+ public static class RefViewFragment extends ViewFragment implements AutoCloseable
+ {
+ public final Refs<SSTableReader> refs;
+
+ public RefViewFragment(List<SSTableReader> sstables, Iterable<Memtable> memtables, Refs<SSTableReader> refs)
+ {
+ super(sstables, memtables);
+ this.refs = refs;
+ }
+
+ public void release()
+ {
+ refs.release();
+ }
+
+ public void close()
+ {
+ refs.release();
+ }
+ }
+
/**
* Returns the creation time of the oldest memtable not fully flushed yet.
*/
@@ -2951,4 +2918,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
fileIndexGenerator.set(0);
}
+
+ public static final Function<DataTracker.View, List<SSTableReader>> ALL_SSTABLES = new Function<DataTracker.View, List<SSTableReader>>()
+ {
+ public List<SSTableReader> apply(DataTracker.View view)
+ {
+ return new ArrayList<>(view.sstables);
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index d086b47..f672cf2 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -367,7 +367,7 @@ public class DataTracker
while (!view.compareAndSet(currentView, newView));
for (SSTableReader sstable : currentView.sstables)
if (!remaining.contains(sstable))
- sstable.releaseReference();
+ sstable.sharedRef().release();
notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
}
@@ -406,7 +406,7 @@ public class DataTracker
sstable.setTrackedBy(this);
for (SSTableReader sstable : oldSSTables)
- sstable.releaseReference();
+ sstable.sharedRef().release();
}
private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)
@@ -467,7 +467,7 @@ public class DataTracker
{
boolean firstToCompact = sstable.markObsolete();
assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted";
- sstable.releaseReference();
+ sstable.sharedRef().release();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 10abd01..9b25a1b 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -372,7 +372,7 @@ public abstract class AbstractCompactionStrategy
if (uncheckedTombstoneCompaction)
return true;
- Set<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(sstable));
+ Collection<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(sstable));
if (overlaps.isEmpty())
{
// there is no overlap, tombstones are safely droppable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index f23d39a..5217189 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.AlwaysPresentFilter;
+import org.apache.cassandra.utils.concurrent.Refs;
+
/**
* Manage compaction options.
*/
@@ -42,8 +44,8 @@ public class CompactionController implements AutoCloseable
public final ColumnFamilyStore cfs;
private DataTracker.SSTableIntervalTree overlappingTree;
- private Set<SSTableReader> overlappingSSTables;
- private final Set<SSTableReader> compacting;
+ private Refs<SSTableReader> overlappingSSTables;
+ private final Iterable<SSTableReader> compacting;
public final int gcBefore;
@@ -76,11 +78,13 @@ public class CompactionController implements AutoCloseable
private void refreshOverlaps()
{
if (this.overlappingSSTables != null)
- SSTableReader.releaseReferences(overlappingSSTables);
+ overlappingSSTables.release();
- Set<SSTableReader> overlapping = compacting == null ? null : cfs.getAndReferenceOverlappingSSTables(compacting);
- this.overlappingSSTables = overlapping == null ? Collections.<SSTableReader>emptySet() : overlapping;
- this.overlappingTree = overlapping == null ? null : DataTracker.buildIntervalTree(overlapping);
+ if (compacting == null)
+ overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList());
+ else
+ overlappingSSTables = cfs.getAndReferenceOverlappingSSTables(compacting);
+ this.overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables);
}
public Set<SSTableReader> getFullyExpiredSSTables()
@@ -104,7 +108,7 @@ public class CompactionController implements AutoCloseable
* @param gcBefore
* @return
*/
- public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore cfStore, Set<SSTableReader> compacting, Set<SSTableReader> overlapping, int gcBefore)
+ public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore cfStore, Iterable<SSTableReader> compacting, Iterable<SSTableReader> overlapping, int gcBefore)
{
logger.debug("Checking droppable sstables in {}", cfStore);
@@ -187,6 +191,6 @@ public class CompactionController implements AutoCloseable
public void close()
{
- SSTableReader.releaseReferences(overlappingSSTables);
+ overlappingSSTables.release();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 02f5e81..f59938f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -82,6 +82,8 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.Refs;
+
/**
* A singleton which manages a private executor of ongoing compactions.
* <p/>
@@ -370,7 +372,7 @@ public class CompactionManager implements CompactionManagerMBean
public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
final Collection<Range<Token>> ranges,
- final Collection<SSTableReader> sstables,
+ final Refs<SSTableReader> sstables,
final long repairedAt)
{
Runnable runnable = new WrappedRunnable() {
@@ -381,18 +383,12 @@ public class CompactionManager implements CompactionManagerMBean
while (!success)
{
for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
- {
- if (sstables.remove(compactingSSTable))
- SSTableReader.releaseReferences(Arrays.asList(compactingSSTable));
- }
+ sstables.releaseIfHolds(compactingSSTable);
Set<SSTableReader> compactedSSTables = new HashSet<>();
for (SSTableReader sstable : sstables)
- {
if (sstable.isMarkedCompacted())
compactedSSTables.add(sstable);
- }
- sstables.removeAll(compactedSSTables);
- SSTableReader.releaseReferences(compactedSSTables);
+ sstables.release(compactedSSTables);
success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
}
performAnticompaction(cfs, ranges, sstables, repairedAt);
@@ -413,7 +409,7 @@ public class CompactionManager implements CompactionManagerMBean
*/
public void performAnticompaction(ColumnFamilyStore cfs,
Collection<Range<Token>> ranges,
- Collection<SSTableReader> validatedForRepair,
+ Refs<SSTableReader> validatedForRepair,
long repairedAt) throws InterruptedException, ExecutionException, IOException
{
logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
@@ -454,13 +450,13 @@ public class CompactionManager implements CompactionManagerMBean
}
cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
- SSTableReader.releaseReferences(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+ validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
if (!sstables.isEmpty())
doAntiCompaction(cfs, ranges, sstables, repairedAt);
}
finally
{
- SSTableReader.releaseReferences(sstables);
+ validatedForRepair.release();
cfs.getDataTracker().unmarkCompacting(sstables);
}
@@ -899,7 +895,7 @@ public class CompactionManager implements CompactionManagerMBean
if (!cfs.isValid())
return;
- Collection<SSTableReader> sstables = null;
+ Refs<SSTableReader> sstables = null;
try
{
@@ -924,7 +920,7 @@ public class CompactionManager implements CompactionManagerMBean
// we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
// instead so they won't be cleaned up if they do get compacted during the validation
if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
- sstables = cfs.markCurrentSSTablesReferenced();
+ sstables = cfs.selectAndReference(ColumnFamilyStore.ALL_SSTABLES).refs;
else
sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
@@ -990,7 +986,7 @@ public class CompactionManager implements CompactionManagerMBean
finally
{
if (sstables != null)
- SSTableReader.releaseReferences(sstables);
+ sstables.release();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index eda09c0..b6c215e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -214,7 +214,14 @@ public class CompactionTask extends AbstractCompactionTask
}
catch (Throwable t)
{
- writer.abort();
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable t2)
+ {
+ t.addSuppressed(t2);
+ }
throw t;
}
finally
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 62b9e4c..be100f4 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.index;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
@@ -52,6 +51,8 @@ import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Refs;
+
/**
* Abstract base class for different types of secondary indexes.
*
@@ -202,8 +203,7 @@ public abstract class SecondaryIndex
logger.info(String.format("Submitting index build of %s for data in %s",
getIndexName(), StringUtils.join(baseCfs.getSSTables(), ", ")));
- Collection<SSTableReader> sstables = baseCfs.markCurrentSSTablesReferenced();
- try
+ try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(ColumnFamilyStore.ALL_SSTABLES).refs)
{
SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
Collections.singleton(getIndexName()),
@@ -213,10 +213,6 @@ public abstract class SecondaryIndex
forceBlockingFlush();
setIndexBuilt();
}
- finally
- {
- SSTableReader.releaseReferences(sstables);
- }
logger.info("Index build of {} complete", getIndexName());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 6eff369..ac0741d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.memory.HeapAllocator;
import org.apache.cassandra.utils.Pair;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 3d7eea7..1cab8c7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -39,6 +39,8 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Ref;
+
/**
* Cassandra SSTable bulk loader.
* Load an externally created sstable into a cluster.
@@ -130,7 +132,7 @@ public class SSTableLoader implements StreamEventHandler
List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
- StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
+ StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, sstable.sharedRef(), sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
streamingDetails.put(endpoint, details);
}
@@ -176,15 +178,17 @@ public class SSTableLoader implements StreamEventHandler
continue;
List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
-
+ List<Ref> refs = new ArrayList<>();
try
{
// transferSSTables assumes references have been acquired
for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
{
- if (!details.sstable.acquireReference())
+ Ref ref = details.sstable.tryRef();
+ if (ref == null)
throw new IllegalStateException();
+ refs.add(ref);
endpointDetails.add(details);
}
@@ -192,8 +196,8 @@ public class SSTableLoader implements StreamEventHandler
}
finally
{
- for (StreamSession.SSTableStreamingSections details : endpointDetails)
- details.sstable.releaseReference();
+ for (Ref ref : refs)
+ ref.release();
}
}
plan.listeners(this, listeners);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 50bf3e3..c51b586 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
@@ -75,7 +74,6 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.dht.AbstractBounds;
@@ -114,6 +112,8 @@ import org.apache.cassandra.utils.FilterFactory;
import org.apache.cassandra.utils.IFilter;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
@@ -121,7 +121,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR
* SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
* Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
*/
-public class SSTableReader extends SSTable
+public class SSTableReader extends SSTable implements RefCounted
{
private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
@@ -182,7 +182,6 @@ public class SSTableReader extends SSTable
private final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
- private final AtomicInteger references = new AtomicInteger(1);
// technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
// but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
private final AtomicBoolean isCompacted = new AtomicBoolean(false);
@@ -194,17 +193,8 @@ public class SSTableReader extends SSTable
private final AtomicLong keyCacheHit = new AtomicLong(0);
private final AtomicLong keyCacheRequest = new AtomicLong(0);
- /**
- * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
- * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
- * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
- * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
- */
- private Object replaceLock = new Object();
- private SSTableReader replacedBy;
- private SSTableReader replaces;
- private SSTableDeletingTask deletingTask;
- private Runnable runOnClose;
+ private final Tidier tidy = new Tidier();
+ private final RefCounted refCounted = RefCounted.Impl.get(tidy);
@VisibleForTesting
public RestorableMeter readMeter;
@@ -409,7 +399,7 @@ public class SSTableReader extends SSTable
sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
sstable.bf = FilterFactory.AlwaysPresent;
-
+ sstable.tidy.setup(sstable);
return sstable;
}
@@ -459,6 +449,7 @@ public class SSTableReader extends SSTable
if (sstable.getKeyCache() != null)
logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
+ sstable.tidy.setup(sstable);
return sstable;
}
@@ -555,7 +546,7 @@ public class SSTableReader extends SSTable
this.maxDataAge = maxDataAge;
this.openReason = openReason;
- deletingTask = new SSTableDeletingTask(this);
+ tidy.deletingTask = new SSTableDeletingTask(this);
// Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
// the read meter when in client mode. Also don't track reads for special operations (like early open)
@@ -600,6 +591,7 @@ public class SSTableReader extends SSTable
this.dfile = dfile;
this.indexSummary = indexSummary;
this.bf = bloomFilter;
+ tidy.setup(this);
}
public static long getTotalBytes(Iterable<SSTableReader> sstables)
@@ -612,117 +604,6 @@ public class SSTableReader extends SSTable
return sum;
}
- private void tidy(boolean release)
- {
- if (readMeterSyncFuture != null)
- readMeterSyncFuture.cancel(false);
-
- if (references.get() != 0)
- {
- throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
- }
-
- synchronized (replaceLock)
- {
- boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
-
- if (replacedBy != null)
- {
- closeBf = replacedBy.bf != bf;
- closeSummary = replacedBy.indexSummary != indexSummary;
- closeFiles = replacedBy.dfile != dfile;
- // if the replacement sstablereader uses a different path, clean up our paths
- deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
- }
-
- if (replaces != null)
- {
- closeBf &= replaces.bf != bf;
- closeSummary &= replaces.indexSummary != indexSummary;
- closeFiles &= replaces.dfile != dfile;
- deleteFiles &= !dfile.path.equals(replaces.dfile.path);
- }
-
- boolean deleteAll = false;
- if (release && isCompacted.get())
- {
- assert replacedBy == null;
- if (replaces != null && !deleteFiles)
- {
- replaces.replacedBy = null;
- replaces.deletingTask = deletingTask;
- replaces.markObsolete();
- }
- else
- {
- deleteAll = true;
- }
- }
- else
- {
- if (replaces != null)
- replaces.replacedBy = replacedBy;
- if (replacedBy != null)
- replacedBy.replaces = replaces;
- }
-
- scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
- }
- }
-
- private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
- {
- if (references.get() != 0)
- throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
-
- final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
- final OpOrder.Barrier barrier;
- if (cfs != null)
- {
- barrier = cfs.readOrdering.newBarrier();
- barrier.issue();
- }
- else
- barrier = null;
-
- ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
- {
- public void run()
- {
- if (barrier != null)
- barrier.await();
- if (closeBf)
- bf.close();
- if (closeSummary)
- indexSummary.close();
- if (closeFiles)
- {
- ifile.cleanup();
- dfile.cleanup();
- }
- if (runOnClose != null)
- runOnClose.run();
- if (deleteAll)
- {
- /**
- * Do the OS a favour and suggest (using fadvice call) that we
- * don't want to see pages of this SSTable in memory anymore.
- *
- * NOTE: We can't use madvice in java because it requires the address of
- * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
- */
- dropPageCache();
- deletingTask.run();
- }
- else if (deleteFiles)
- {
- FileUtils.deleteWithConfirm(new File(dfile.path));
- FileUtils.deleteWithConfirm(new File(ifile.path));
- }
- }
- });
- }
-
public boolean equals(Object that)
{
return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
@@ -745,7 +626,7 @@ public class SSTableReader extends SSTable
public void setTrackedBy(DataTracker tracker)
{
- deletingTask.setTracker(tracker);
+ tidy.deletingTask.setTracker(tracker);
// under normal operation we can do this at any time, but SSTR is also used outside C* proper,
// e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache
// here when we know we're being wired into the rest of the server infrastructure.
@@ -817,6 +698,7 @@ public class SSTableReader extends SSTable
dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
saveSummary(ibuilder, dbuilder);
+ tidy.setup(this);
}
/**
@@ -968,26 +850,26 @@ public class SSTableReader extends SSTable
public void setReplacedBy(SSTableReader replacement)
{
- synchronized (replaceLock)
+ synchronized (tidy.replaceLock)
{
- assert replacedBy == null;
- replacedBy = replacement;
- replacement.replaces = this;
- replacement.replaceLock = replaceLock;
+ assert tidy.replacedBy == null;
+ tidy.replacedBy = replacement;
+ replacement.tidy.replaces = this;
+ replacement.tidy.replaceLock = tidy.replaceLock;
}
}
public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
{
- synchronized (replaceLock)
+ synchronized (tidy.replaceLock)
{
- assert replacedBy == null;
+ assert tidy.replacedBy == null;
if (newStart.compareTo(this.first) > 0)
{
if (newStart.compareTo(this.last) > 0)
{
- this.runOnClose = new Runnable()
+ this.tidy.runOnClose = new Runnable()
{
public void run()
{
@@ -1001,7 +883,7 @@ public class SSTableReader extends SSTable
{
final long dataStart = getPosition(newStart, Operator.GE).position;
final long indexStart = getIndexScanPosition(newStart);
- this.runOnClose = new Runnable()
+ this.tidy.runOnClose = new Runnable()
{
public void run()
{
@@ -1034,9 +916,9 @@ public class SSTableReader extends SSTable
*/
public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
{
- synchronized (replaceLock)
+ synchronized (tidy.replaceLock)
{
- assert replacedBy == null;
+ assert tidy.replacedBy == null;
int minIndexInterval = metadata.getMinIndexInterval();
int maxIndexInterval = metadata.getMaxIndexInterval();
@@ -1671,36 +1553,6 @@ public class SSTableReader extends SSTable
return dfile.onDiskLength;
}
- public boolean acquireReference()
- {
- while (true)
- {
- int n = references.get();
- if (n <= 0)
- return false;
- if (references.compareAndSet(n, n + 1))
- return true;
- }
- }
-
- @VisibleForTesting
- public int referenceCount()
- {
- return references.get();
- }
-
- /**
- * Release reference to this SSTableReader.
- * If there is no one referring to this SSTable, and is marked as compacted,
- * all resources are cleaned up and files are deleted eventually.
- */
- public void releaseReference()
- {
- if (references.decrementAndGet() == 0)
- tidy(true);
- assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;
- }
-
/**
* Mark the sstable as obsolete, i.e., compacted into newer sstables.
*
@@ -1715,9 +1567,9 @@ public class SSTableReader extends SSTable
if (logger.isDebugEnabled())
logger.debug("Marking {} compacted", getFilename());
- synchronized (replaceLock)
+ synchronized (tidy.replaceLock)
{
- assert replacedBy == null : getFilename();
+ assert tidy.replacedBy == null : getFilename();
}
return !isCompacted.getAndSet(true);
}
@@ -1821,13 +1673,13 @@ public class SSTableReader extends SSTable
public SSTableReader getCurrentReplacement()
{
- synchronized (replaceLock)
+ synchronized (tidy.replaceLock)
{
- SSTableReader cur = this, next = replacedBy;
+ SSTableReader cur = this, next = tidy.replacedBy;
while (next != null)
{
cur = next;
- next = next.replacedBy;
+ next = next.tidy.replacedBy;
}
return cur;
}
@@ -2013,90 +1865,216 @@ public class SSTableReader extends SSTable
}
/**
- * @param sstables
- * @return true if all desired references were acquired. Otherwise, it will unreference any partial acquisition, and return false.
+ * Increment the total row read count and read rate for this SSTable. This should not be incremented for range
+ * slice queries, row cache hits, or non-query reads, like compaction.
*/
- public static boolean acquireReferences(Iterable<SSTableReader> sstables)
+ public void incrementReadCount()
{
- SSTableReader failed = null;
- for (SSTableReader sstable : sstables)
- {
- if (!sstable.acquireReference())
- {
- failed = sstable;
- break;
- }
- }
-
- if (failed == null)
- return true;
+ if (readMeter != null)
+ readMeter.mark();
+ }
- for (SSTableReader sstable : sstables)
+ public static class SizeComparator implements Comparator<SSTableReader>
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
{
- if (sstable == failed)
- break;
- sstable.releaseReference();
+ return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
}
- return false;
}
- public static void releaseReferences(Iterable<SSTableReader> sstables)
+ public Ref tryRef()
{
- for (SSTableReader sstable : sstables)
- {
- sstable.releaseReference();
- }
+ return refCounted.tryRef();
}
- private void dropPageCache()
+ public Ref sharedRef()
{
- dropPageCache(dfile.path);
- dropPageCache(ifile.path);
+ return refCounted.sharedRef();
}
- private void dropPageCache(String filePath)
+ private static final class Tidier implements Tidy
{
- RandomAccessFile file = null;
+ private String name;
+ private CFMetaData metadata;
+ // indexfile and datafile: might be null before a call to load()
+ private SegmentedFile ifile;
+ private SegmentedFile dfile;
- try
- {
- file = new RandomAccessFile(filePath, "r");
+ private IndexSummary indexSummary;
+ private IFilter bf;
- int fd = CLibrary.getfd(file.getFD());
+ private AtomicBoolean isCompacted;
- if (fd > 0)
- {
- if (logger.isDebugEnabled())
- logger.debug(String.format("Dropping page cache of file %s.", filePath));
+ /**
+ * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
+ * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
+ * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
+ * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
+ */
+ private Object replaceLock = new Object();
+ private SSTableReader replacedBy;
+ private SSTableReader replaces;
+ private SSTableDeletingTask deletingTask;
+ private Runnable runOnClose;
- CLibrary.trySkipCache(fd, 0, 0);
- }
+ @VisibleForTesting
+ public RestorableMeter readMeter;
+ private volatile ScheduledFuture readMeterSyncFuture;
+
+ private void setup(SSTableReader reader)
+ {
+ name = reader.toString();
+ metadata = reader.metadata;
+ ifile = reader.ifile;
+ dfile = reader.dfile;
+ indexSummary = reader.indexSummary;
+ bf = reader.bf;
+ isCompacted = reader.isCompacted;
+ readMeterSyncFuture = reader.readMeterSyncFuture;
}
- catch (IOException e)
+
+ public String name()
{
- // we don't care if cache cleanup fails
+ return name;
}
- finally
+
+ private void dropPageCache()
{
- FileUtils.closeQuietly(file);
+ dropPageCache(dfile.path);
+ dropPageCache(ifile.path);
}
- }
- /**
- * Increment the total row read count and read rate for this SSTable. This should not be incremented for range
- * slice queries, row cache hits, or non-query reads, like compaction.
- */
- public void incrementReadCount()
- {
- if (readMeter != null)
- readMeter.mark();
- }
+ private void dropPageCache(String filePath)
+ {
+ RandomAccessFile file = null;
- public static class SizeComparator implements Comparator<SSTableReader>
- {
- public int compare(SSTableReader o1, SSTableReader o2)
+ try
+ {
+ file = new RandomAccessFile(filePath, "r");
+
+ int fd = CLibrary.getfd(file.getFD());
+
+ if (fd > 0)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Dropping page cache of file %s.", filePath));
+
+ CLibrary.trySkipCache(fd, 0, 0);
+ }
+ }
+ catch (IOException e)
+ {
+ // we don't care if cache cleanup fails
+ }
+ finally
+ {
+ FileUtils.closeQuietly(file);
+ }
+ }
+
+ public void tidy()
{
- return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
+ if (readMeterSyncFuture != null)
+ readMeterSyncFuture.cancel(false);
+
+ synchronized (replaceLock)
+ {
+ boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
+
+ if (replacedBy != null)
+ {
+ closeBf = replacedBy.bf != bf;
+ closeSummary = replacedBy.indexSummary != indexSummary;
+ closeFiles = replacedBy.dfile != dfile;
+ // if the replacement sstablereader uses a different path, clean up our paths
+ deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
+ }
+
+ if (replaces != null)
+ {
+ closeBf &= replaces.bf != bf;
+ closeSummary &= replaces.indexSummary != indexSummary;
+ closeFiles &= replaces.dfile != dfile;
+ deleteFiles &= !dfile.path.equals(replaces.dfile.path);
+ }
+
+ boolean deleteAll = false;
+ if (isCompacted.get())
+ {
+ assert replacedBy == null;
+ if (replaces != null && !deleteFiles)
+ {
+ replaces.tidy.replacedBy = null;
+ replaces.tidy.deletingTask = deletingTask;
+ replaces.markObsolete();
+ }
+ else
+ {
+ deleteAll = true;
+ }
+ }
+ else
+ {
+ closeSummary &= indexSummary != null;
+ if (replaces != null)
+ replaces.tidy.replacedBy = replacedBy;
+ if (replacedBy != null)
+ replacedBy.tidy.replaces = replaces;
+ }
+
+ scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
+ }
+ }
+
+ private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
+ {
+ final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+ final OpOrder.Barrier barrier;
+ if (cfs != null)
+ {
+ barrier = cfs.readOrdering.newBarrier();
+ barrier.issue();
+ }
+ else
+ barrier = null;
+
+ ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
+ {
+ public void run()
+ {
+ if (barrier != null)
+ barrier.await();
+ if (closeBf)
+ bf.close();
+ if (closeSummary)
+ indexSummary.close();
+ if (closeFiles)
+ {
+ ifile.cleanup();
+ dfile.cleanup();
+ }
+ if (runOnClose != null)
+ runOnClose.run();
+ if (deleteAll)
+ {
+ /**
+ * Do the OS a favour and suggest (using fadvice call) that we
+ * don't want to see pages of this SSTable in memory anymore.
+ *
+ * NOTE: We can't use madvice in java because it requires the address of
+ * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
+ */
+ dropPageCache();
+ deletingTask.run();
+ }
+ else if (deleteFiles)
+ {
+ FileUtils.deleteWithConfirm(new File(dfile.path));
+ FileUtils.deleteWithConfirm(new File(ifile.path));
+ }
+ }
+ });
}
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 43ac4b6..7784b18 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -176,14 +176,14 @@ public class SSTableRewriter
public void abort()
{
- switchWriter(null);
+ switchWriter(null, true);
moveStarts(null, Functions.forMap(originalStarts), true);
// remove already completed SSTables
for (SSTableReader sstable : finished)
{
sstable.markObsolete();
- sstable.releaseReference();
+ sstable.sharedRef().release();
}
// abort the writers
@@ -276,6 +276,11 @@ public class SSTableRewriter
public void switchWriter(SSTableWriter newWriter)
{
+ switchWriter(newWriter, false);
+ }
+
+ private void switchWriter(SSTableWriter newWriter, boolean abort)
+ {
if (writer == null)
{
writer = newWriter;
@@ -283,7 +288,7 @@ public class SSTableRewriter
}
// we leave it as a tmp file, but we open it and add it to the dataTracker
- if (writer.getFilePointer() != 0)
+ if (writer.getFilePointer() != 0 && !abort)
{
SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
@@ -333,7 +338,7 @@ public class SSTableRewriter
private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate)
{
List<SSTableReader> newReaders = new ArrayList<>();
- switchWriter(null);
+ switchWriter(null, false);
if (throwEarly)
throw new RuntimeException("exception thrown early in finish, for testing");
@@ -377,7 +382,7 @@ public class SSTableRewriter
{
if (reader.getCurrentReplacement() == null)
reader.markObsolete();
- reader.releaseReference();
+ reader.sharedRef().release();
}
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 5f78132..cc60594 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -416,7 +416,7 @@ public class SSTableWriter extends SSTable
if (inclusiveUpperBoundOfReadableData == null)
{
// Prevent leaving tmplink files on disk
- sstable.releaseReference();
+ sstable.sharedRef().release();
return null;
}
int offset = 2;
@@ -428,7 +428,7 @@ public class SSTableWriter extends SSTable
inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
if (inclusiveUpperBoundOfReadableData == null)
{
- sstable.releaseReference();
+ sstable.sharedRef().release();
return null;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 36f7c5c..1c5138b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -25,6 +25,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.Bounds;
@@ -56,6 +56,10 @@ import org.apache.cassandra.repair.messages.SyncComplete;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+
+import org.apache.cassandra.utils.concurrent.Refs;
/**
* ActiveRepairService is the starting point for manual "active" repairs.
@@ -376,7 +380,7 @@ public class ActiveRepairService
List<Future<?>> futures = new ArrayList<>();
for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
{
- Collection<SSTableReader> sstables = new HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()));
+ Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey());
ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
}
@@ -422,10 +426,11 @@ public class ActiveRepairService
this.repairedAt = repairedAt;
}
- public Collection<SSTableReader> getAndReferenceSSTables(UUID cfId)
+ public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId)
{
Set<SSTableReader> sstables = sstableMap.get(cfId);
Iterator<SSTableReader> sstableIterator = sstables.iterator();
+ ImmutableMap.Builder<SSTableReader, Ref> references = ImmutableMap.builder();
while (sstableIterator.hasNext())
{
SSTableReader sstable = sstableIterator.next();
@@ -435,11 +440,14 @@ public class ActiveRepairService
}
else
{
- if (!sstable.acquireReference())
+ Ref ref = sstable.tryRef();
+ if (ref == null)
sstableIterator.remove();
+ else
+ references.put(sstable, ref);
}
}
- return sstables;
+ return new Refs<>(references.build());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index aa18954..44b83f9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -36,6 +36,8 @@ import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Refs;
+
/**
* Task that manages receiving files for the session for certain ColumnFamily.
*/
@@ -127,18 +129,12 @@ public class StreamReceiveTask extends StreamTask
lockfile.delete();
task.sstables.clear();
- if (!SSTableReader.acquireReferences(readers))
- throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
- try
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
{
// add sstables and build secondary indexes
cfs.addSSTables(readers);
cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
}
- finally
- {
- SSTableReader.releaseReferences(readers);
- }
task.session.taskCompleted(task);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 2a3cf55..6108dea 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -45,6 +45,10 @@ import org.apache.cassandra.streaming.messages.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.Refs;
/**
* Handles the streaming a one or more section of one of more sstables to and from a specific
@@ -267,7 +271,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
finally
{
for (SSTableStreamingSections release : sections)
- release.sstable.releaseReference();
+ release.ref.release();
}
}
@@ -289,7 +293,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, boolean isIncremental)
{
- List<SSTableReader> sstables = new ArrayList<>();
+ Refs<SSTableReader> refs = new Refs<>();
try
{
for (ColumnFamilyStore cfStore : stores)
@@ -297,16 +301,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)
rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental));
- sstables.addAll(view.sstables);
+ refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs);
}
- List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
- for (SSTableReader sstable : sstables)
+
+ List<SSTableStreamingSections> sections = new ArrayList<>(refs.size());
+ for (SSTableReader sstable : refs)
{
long repairedAt = overriddenRepairedAt;
if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
repairedAt = sstable.getSSTableMetadata().repairedAt;
- sections.add(new SSTableStreamingSections(sstable,
+ sections.add(new SSTableStreamingSections(sstable, refs.get(sstable),
sstable.getPositionsForRanges(ranges),
sstable.estimatedKeysForRanges(ranges),
repairedAt));
@@ -315,7 +319,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
catch (Throwable t)
{
- SSTableReader.releaseReferences(sstables);
+ refs.release();
throw t;
}
}
@@ -329,7 +333,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
if (details.sections.isEmpty())
{
// A reference was acquired on the sstable and we won't stream it
- details.sstable.releaseReference();
+ details.ref.release();
iter.remove();
continue;
}
@@ -341,7 +345,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
task = new StreamTransferTask(this, cfId);
transfers.put(cfId, task);
}
- task.addTransferFile(details.sstable, details.estimatedKeys, details.sections, details.repairedAt);
+ task.addTransferFile(details.sstable, details.ref, details.estimatedKeys, details.sections, details.repairedAt);
iter.remove();
}
}
@@ -349,13 +353,15 @@ public class StreamSession implements IEndpointStateChangeSubscriber
public static class SSTableStreamingSections
{
public final SSTableReader sstable;
+ public final Ref ref;
public final List<Pair<Long, Long>> sections;
public final long estimatedKeys;
public final long repairedAt;
- public SSTableStreamingSections(SSTableReader sstable, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
+ public SSTableStreamingSections(SSTableReader sstable, Ref ref, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
{
this.sstable = sstable;
+ this.ref = ref;
this.sections = sections;
this.estimatedKeys = estimatedKeys;
this.repairedAt = repairedAt;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index b840ee5..b00042e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -26,6 +26,8 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
/**
* StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
@@ -47,10 +49,10 @@ public class StreamTransferTask extends StreamTask
super(session, cfId);
}
- public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+ public synchronized void addTransferFile(SSTableReader sstable, Ref ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
assert sstable != null && cfId.equals(sstable.metadata.cfId);
- OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
+ OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
files.put(message.header.sequenceNumber, message);
totalSize += message.header.size();
}
@@ -71,7 +73,7 @@ public class StreamTransferTask extends StreamTask
OutgoingFileMessage file = files.remove(sequenceNumber);
if (file != null)
- file.sstable.releaseReference();
+ file.ref.release();
signalComplete = files.isEmpty();
}
@@ -92,7 +94,7 @@ public class StreamTransferTask extends StreamTask
timeoutTasks.clear();
for (OutgoingFileMessage file : files.values())
- file.sstable.releaseReference();
+ file.ref.release();
}
public synchronized int getTotalNumberOfFiles()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index b012869..5ebf289 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -31,6 +31,8 @@ import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
import org.apache.cassandra.streaming.compress.CompressionInfo;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Ref;
+
/**
* OutgoingFileMessage is used to transfer the part(or whole) of a SSTable data file.
*/
@@ -58,13 +60,15 @@ public class OutgoingFileMessage extends StreamMessage
}
};
- public FileMessageHeader header;
- public SSTableReader sstable;
+ public final FileMessageHeader header;
+ public final SSTableReader sstable;
+ public final Ref ref;
- public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+ public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
super(Type.FILE);
this.sstable = sstable;
+ this.ref = ref;
CompressionInfo compressionInfo = null;
if (sstable.compression)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 82e3783..63a3727 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -119,7 +119,7 @@ public class StandaloneScrubber
// Remove the sstable (it's been copied by scrub and snapshotted)
sstable.markObsolete();
- sstable.releaseReference();
+ sstable.sharedRef().release();
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
new file mode 100644
index 0000000..4afceb0
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -0,0 +1,134 @@
+package org.apache.cassandra.utils.concurrent;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A single managed reference to a RefCounted object
+ */
+public final class Ref
+{
+ static final Logger logger = LoggerFactory.getLogger(Ref.class);
+ static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
+
+ final State state;
+
+ Ref(RefCountedImpl.GlobalState state, boolean isSharedRef)
+ {
+ this.state = new State(state, this, RefCountedImpl.referenceQueue, isSharedRef);
+ }
+
+ /**
+ * Must be called exactly once, when the logical operation for which this Ref was created has terminated.
+ * Failure to abide by this contract will result in an error (eventually) being reported, assuming a
+ * hard reference to the resource it managed is not leaked.
+ */
+ public void release()
+ {
+ state.release(false);
+ }
+
+ /**
+ * A convenience method for reporting:
+ * @return the number of currently extant references globally, including the shared reference
+ */
+ public int globalCount()
+ {
+ return state.globalState.count();
+ }
+
+ // similar to RefCountedState, but tracks only the management of each unique ref created to the managed object
+ // ensures it is only released once, and that it is always released
+ static final class State extends PhantomReference<Ref>
+ {
+ final Debug debug = DEBUG_ENABLED ? new Debug() : null;
+ final boolean isSharedRef;
+ final RefCountedImpl.GlobalState globalState;
+ private volatile int released;
+
+ private static final AtomicIntegerFieldUpdater<State> releasedUpdater = AtomicIntegerFieldUpdater.newUpdater(State.class, "released");
+
+ public State(final RefCountedImpl.GlobalState globalState, Ref reference, ReferenceQueue<? super Ref> q, boolean isSharedRef)
+ {
+ super(reference, q);
+ this.globalState = globalState;
+ this.isSharedRef = isSharedRef;
+ globalState.register(this);
+ }
+
+ void release(boolean leak)
+ {
+ if (!releasedUpdater.compareAndSet(this, 0, 1))
+ {
+ if (!leak)
+ {
+ String id = this.toString();
+ logger.error("BAD RELEASE: attempted to release a{} reference ({}) that has already been released", isSharedRef ? " shared" : "", id);
+ if (DEBUG_ENABLED)
+ debug.log(id);
+ throw new IllegalStateException("Attempted to release a reference that has already been released");
+ }
+ return;
+ }
+ globalState.release(this);
+ if (leak)
+ {
+ String id = this.toString();
+ if (isSharedRef)
+ logger.error("LEAK DETECTED: the shared reference ({}) to {} was not released before the object was garbage collected", id, globalState);
+ else
+ logger.error("LEAK DETECTED: a reference ({}) to {} was not released before the reference was garbage collected", id, globalState);
+ if (DEBUG_ENABLED)
+ debug.log(id);
+ }
+ else if (DEBUG_ENABLED)
+ {
+ debug.deallocate();
+ }
+ }
+ }
+
+ static final class Debug
+ {
+ String allocateThread, deallocateThread;
+ StackTraceElement[] allocateTrace, deallocateTrace;
+ Debug()
+ {
+ Thread thread = Thread.currentThread();
+ allocateThread = thread.toString();
+ allocateTrace = thread.getStackTrace();
+ }
+ synchronized void deallocate()
+ {
+ Thread thread = Thread.currentThread();
+ deallocateThread = thread.toString();
+ deallocateTrace = thread.getStackTrace();
+ }
+ synchronized void log(String id)
+ {
+ logger.error("Allocate trace {}:\n{}", id, print(allocateThread, allocateTrace));
+ if (deallocateThread != null)
+ logger.error("Deallocate trace {}:\n{}", id, print(deallocateThread, deallocateTrace));
+ }
+ String print(String thread, StackTraceElement[] trace)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(thread.toString());
+ sb.append("\n");
+ for (StackTraceElement element : trace)
+ {
+ sb.append("\tat ");
+ sb.append(element );
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c75ee416/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java b/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
new file mode 100644
index 0000000..7ad51ad
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+/**
+ * An object that needs ref counting does the following:
+ * - defines a Tidy object that will cleanup once it's gone,
+ * (this must retain no references to the object we're tracking (only its resources and how to clean up))
+ * - implements RefCounted
+ * - encapsulates a RefCounted.Impl, to which it proxies all calls to RefCounted behaviours
+ * - ensures no external access to the encapsulated Impl, and permits no references to it to leak
+ * - users must ensure no references to the sharedRef leak, or are retained outside of a method scope either.
+ * (to ensure the sharedRef is collected with the object, so that leaks may be detected and corrected)
+ *
+ * This class' functionality is achieved by what may look at first glance like a complex web of references,
+ * but boils down to:
+ *
+ * Target --> Impl --> sharedRef --> [RefState] <--> RefCountedState --> Tidy
+ * ^ ^
+ * | |
+ * Ref ----------------------------------- |
+ * |
+ * Global -------------------------------------------------
+ *
+ * So that, if Target is collected, Impl is collected and, hence, so is sharedRef.
+ *
+ * Once ref or sharedRef are collected, the paired RefState's release method is called, which if it had
+ * not already been called will update RefCountedState and log an error.
+ *
+ * Once the RefCountedState has been completely released, the Tidy method is called and it removes the global reference
+ * to itself so it may also be collected.
+ */
+public interface RefCounted
+{
+
+ /**
+ * @return the a new Ref() to the managed object, incrementing its refcount, or null if it is already released
+ */
+ public Ref tryRef();
+
+ /**
+ * @return the shared Ref that is created at instantiation of the RefCounted instance.
+ * Once released, if no other refs are extant the object will be tidied; references to
+ * this object should never be retained outside of a method's scope
+ */
+ public Ref sharedRef();
+
+ public static interface Tidy
+ {
+ void tidy();
+ String name();
+ }
+
+ public static class Impl
+ {
+ public static RefCounted get(Tidy tidy)
+ {
+ return new RefCountedImpl(tidy);
+ }
+ }
+}