You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/11 16:23:39 UTC
[4/7] cassandra git commit: Safer Resource Management++
Safer Resource Management++
patch by benedict; reviewed by marcus for CASSANDRA-8707
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/61384c57
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/61384c57
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/61384c57
Branch: refs/heads/trunk
Commit: 61384c57546da3d411630c64c4aa89d90cac98f7
Parents: 708b0ce
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Feb 11 14:56:23 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Feb 11 14:56:23 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 6 +-
.../cassandra/db/compaction/CompactionTask.java | 4 -
.../SizeTieredCompactionStrategy.java | 10 +-
.../io/compress/CompressionMetadata.java | 19 +-
.../cassandra/io/sstable/IndexSummary.java | 47 +-
.../io/sstable/IndexSummaryBuilder.java | 3 +-
.../io/sstable/IndexSummaryManager.java | 6 +-
.../io/sstable/SSTableDeletingTask.java | 36 +-
.../cassandra/io/sstable/SSTableLoader.java | 4 +-
.../cassandra/io/sstable/SSTableReader.java | 590 ++++++++++++-------
.../cassandra/io/sstable/SSTableRewriter.java | 10 +-
.../cassandra/io/sstable/SSTableWriter.java | 45 +-
.../io/util/BufferedPoolingSegmentedFile.java | 12 +-
.../io/util/BufferedSegmentedFile.java | 23 +-
.../io/util/CompressedPoolingSegmentedFile.java | 29 +-
.../io/util/CompressedSegmentedFile.java | 32 +-
.../org/apache/cassandra/io/util/Memory.java | 8 +-
.../cassandra/io/util/MmappedSegmentedFile.java | 55 +-
.../cassandra/io/util/PoolingSegmentedFile.java | 35 +-
.../apache/cassandra/io/util/SegmentedFile.java | 40 +-
.../cassandra/service/ActiveRepairService.java | 4 +-
.../cassandra/streaming/StreamSession.java | 2 +-
.../streaming/messages/OutgoingFileMessage.java | 2 +-
.../cassandra/tools/StandaloneScrubber.java | 2 +-
.../cassandra/utils/AlwaysPresentFilter.java | 5 +
.../org/apache/cassandra/utils/BloomFilter.java | 20 +-
.../org/apache/cassandra/utils/IFilter.java | 7 +-
.../cassandra/utils/Murmur3BloomFilter.java | 14 +-
.../apache/cassandra/utils/concurrent/Ref.java | 208 ++++++-
.../cassandra/utils/concurrent/RefCounted.java | 52 +-
.../utils/concurrent/RefCountedImpl.java | 132 -----
.../apache/cassandra/utils/concurrent/Refs.java | 26 +-
.../cassandra/utils/obs/OffHeapBitSet.java | 5 +
.../cassandra/db/ColumnFamilyStoreTest.java | 5 +-
.../db/compaction/AntiCompactionTest.java | 6 +-
.../SizeTieredCompactionStrategyTest.java | 30 +-
.../io/sstable/IndexSummaryManagerTest.java | 37 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 3 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 11 +-
.../io/sstable/SSTableRewriterTest.java | 13 +-
.../streaming/StreamTransferTaskTest.java | 2 +-
.../cassandra/tools/SSTableExportTest.java | 1 +
.../cassandra/tools/SSTableImportTest.java | 7 +
.../apache/cassandra/utils/BloomFilterTest.java | 22 +-
.../cassandra/utils/SerializationsTest.java | 5 +-
.../utils/concurrent/RefCountedTest.java | 12 +-
47 files changed, 991 insertions(+), 657 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 248139f..b323f18 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.4
+ * Safer Resource Management++ (CASSANDRA-8707)
* Write partition size estimates into a system table (CASSANDRA-7688)
* cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
(CASSANDRA-8154)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 5ec06bc..acf9f92 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -367,7 +367,7 @@ public class DataTracker
while (!view.compareAndSet(currentView, newView));
for (SSTableReader sstable : currentView.sstables)
if (!remaining.contains(sstable))
- sstable.sharedRef().release();
+ sstable.selfRef().release();
notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
}
@@ -406,7 +406,7 @@ public class DataTracker
sstable.setTrackedBy(this);
for (SSTableReader sstable : oldSSTables)
- sstable.sharedRef().release();
+ sstable.selfRef().release();
}
private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)
@@ -467,7 +467,7 @@ public class DataTracker
{
boolean firstToCompact = sstable.markObsolete();
assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted";
- sstable.sharedRef().release();
+ sstable.selfRef().release();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index b6c215e..4d9b463 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -18,10 +18,8 @@
package org.apache.cassandra.db.compaction;
import java.io.File;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -31,7 +29,6 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
@@ -40,7 +37,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
import org.apache.cassandra.io.sstable.SSTableReader;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index fbd715c..8b1610e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -144,8 +144,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
// calculate the total reads/sec across all sstables
double totalReads = 0.0;
for (SSTableReader sstr : sstables)
- if (sstr.readMeter != null)
- totalReads += sstr.readMeter.twoHourRate();
+ if (sstr.getReadMeter() != null)
+ totalReads += sstr.getReadMeter().twoHourRate();
// if this is a system table with no read meters or we don't have any read rates yet, just return them all
if (totalReads == 0.0)
@@ -159,11 +159,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
while (cutoffIndex < sstables.size())
{
SSTableReader sstable = sstables.get(cutoffIndex);
- if (sstable.readMeter == null)
+ if (sstable.getReadMeter() == null)
{
throw new AssertionError("If you're seeing this exception, please attach your logs to CASSANDRA-8238 to help us debug. "+sstable);
}
- double reads = sstable.readMeter.twoHourRate();
+ double reads = sstable.getReadMeter().twoHourRate();
if (totalColdReads + reads > maxColdReads)
break;
@@ -307,7 +307,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
private static double hotness(SSTableReader sstr)
{
// system tables don't have read meters, just use 0.0 for the hotness
- return sstr.readMeter == null ? 0.0 : sstr.readMeter.twoHourRate() / sstr.estimatedKeys();
+ return sstr.getReadMeter() == null ? 0.0 : sstr.getReadMeter().twoHourRate() / sstr.estimatedKeys();
}
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index a40048a..aaf1656 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -325,18 +325,15 @@ public class CompressionMetadata
public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType finishType)
{
RefCountedMemory offsets;
- switch (finishType)
+ if (finishType.isFinal)
{
- case EARLY:
- offsets = this.offsets;
- break;
- case NORMAL:
- case FINISH_EARLY:
- offsets = this.offsets.copy(count * 8L);
- this.offsets.unreference();
- break;
- default:
- throw new AssertionError();
+ // we now know how many offsets we have and can resize the offsets properly
+ offsets = this.offsets.copy(count * 8L);
+ this.offsets.unreference();
+ }
+ else
+ {
+ offsets = this.offsets;
}
return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index f53a7e4..0cde124 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -17,21 +17,19 @@
*/
package org.apache.cassandra.io.sstable;
-import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.Memory;
import org.apache.cassandra.io.util.MemoryOutputStream;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
@@ -45,10 +43,8 @@ import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
* (This is necessary because keys can have different lengths.)
* 2. A sequence of (DecoratedKey, position) pairs, where position is the offset into the actual index file.
*/
-public class IndexSummary implements Closeable
+public class IndexSummary extends WrappedSharedCloseable
{
- private static final Logger logger = LoggerFactory.getLogger(IndexSummary.class);
-
public static final IndexSummarySerializer serializer = new IndexSummarySerializer();
/**
@@ -60,7 +56,7 @@ public class IndexSummary implements Closeable
private final IPartitioner partitioner;
private final int summarySize;
private final int sizeAtFullSampling;
- private final RefCountedMemory bytes;
+ private final Memory bytes;
/**
* A value between 1 and BASE_SAMPLING_LEVEL that represents how many of the original
@@ -70,17 +66,29 @@ public class IndexSummary implements Closeable
*/
private final int samplingLevel;
- public IndexSummary(IPartitioner partitioner, RefCountedMemory memory, int summarySize, int sizeAtFullSampling,
+ public IndexSummary(IPartitioner partitioner, Memory bytes, int summarySize, int sizeAtFullSampling,
int minIndexInterval, int samplingLevel)
{
+ super(bytes);
this.partitioner = partitioner;
this.minIndexInterval = minIndexInterval;
this.summarySize = summarySize;
this.sizeAtFullSampling = sizeAtFullSampling;
- this.bytes = memory;
+ this.bytes = bytes;
this.samplingLevel = samplingLevel;
}
+ private IndexSummary(IndexSummary copy)
+ {
+ super(copy);
+ this.partitioner = copy.partitioner;
+ this.minIndexInterval = copy.minIndexInterval;
+ this.summarySize = copy.summarySize;
+ this.sizeAtFullSampling = copy.sizeAtFullSampling;
+ this.bytes = copy.bytes;
+ this.samplingLevel = copy.samplingLevel;
+ }
+
// binary search is notoriously more difficult to get right than it looks; this is lifted from
// Harmony's Collections implementation
public int binarySearch(RowPosition key)
@@ -137,7 +145,7 @@ public class IndexSummary implements Closeable
long start = getPositionInSummary(index);
long end = calculateEnd(index);
byte[] entry = new byte[(int)(end - start)];
- bytes.getBytes(start, entry, 0, (int)(end - start));
+ bytes.getBytes(start, entry, 0, (int) (end - start));
return entry;
}
@@ -206,6 +214,11 @@ public class IndexSummary implements Closeable
return Downsampling.getEffectiveIndexIntervalAfterIndex(index, samplingLevel, minIndexInterval);
}
+ public IndexSummary sharedCopy()
+ {
+ return new IndexSummary(this);
+ }
+
public static class IndexSummarySerializer
{
public void serialize(IndexSummary t, DataOutputPlus out, boolean withSamplingLevel) throws IOException
@@ -256,16 +269,4 @@ public class IndexSummary implements Closeable
return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, minIndexInterval, samplingLevel);
}
}
-
- @Override
- public void close()
- {
- bytes.unreference();
- }
-
- public IndexSummary readOnlyClone()
- {
- bytes.reference();
- return this;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 8e9cc30..df326d7 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.Memory;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
import static org.apache.cassandra.io.sstable.SSTable.getMinimalKey;
@@ -148,7 +149,7 @@ public class IndexSummaryBuilder
// first we write out the position in the *summary* for each key in the summary,
// then we write out (key, actual index position) pairs
- RefCountedMemory memory = new RefCountedMemory(offheapSize + (length * 4));
+ Memory memory = Memory.allocate(offheapSize + (length * 4));
int idxPosition = 0;
int keyPosition = length * 4;
for (int i = 0; i < length; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 65b25a4..4144c32 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -266,9 +266,9 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
double totalReadsPerSec = 0.0;
for (SSTableReader sstable : nonCompacting)
{
- if (sstable.readMeter != null)
+ if (sstable.getReadMeter() != null)
{
- Double readRate = sstable.readMeter.fifteenMinuteRate();
+ Double readRate = sstable.getReadMeter().fifteenMinuteRate();
totalReadsPerSec += readRate;
readRates.put(sstable, readRate);
}
@@ -314,7 +314,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
int minIndexInterval = sstable.metadata.getMinIndexInterval();
int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
- double readsPerSec = sstable.readMeter == null ? 0.0 : sstable.readMeter.fifteenMinuteRate();
+ double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
// figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
index fb1cbb3..3da6906 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
@@ -40,31 +40,42 @@ public class SSTableDeletingTask implements Runnable
// and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
// Additionally, we need to make sure to delete the data file first, so on restart the others
// will be recognized as GCable.
- private static final Set<SSTableDeletingTask> failedTasks = new CopyOnWriteArraySet<SSTableDeletingTask>();
+ private static final Set<SSTableDeletingTask> failedTasks = new CopyOnWriteArraySet<>();
private final SSTableReader referent;
private final Descriptor desc;
private final Set<Component> components;
private DataTracker tracker;
- public SSTableDeletingTask(SSTableReader referent)
+ /**
+ * realDescriptor is the actual descriptor for the sstable, the descriptor inside
+ * referent can be 'faked' as FINAL for early opened files. We need the real one
+ * to be able to remove the files.
+ */
+ public SSTableDeletingTask(Descriptor realDescriptor, SSTableReader referent)
{
this.referent = referent;
- if (referent.openReason == SSTableReader.OpenReason.EARLY)
+ this.desc = realDescriptor;
+ switch (desc.type)
{
- this.desc = referent.descriptor.asType(Descriptor.Type.TEMPLINK);
- this.components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
- }
- else
- {
- this.desc = referent.descriptor;
- this.components = referent.components;
+ case FINAL:
+ this.components = referent.components;
+ break;
+ case TEMPLINK:
+ this.components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
+ break;
+ default:
+ throw new IllegalStateException();
}
}
public void setTracker(DataTracker tracker)
{
- this.tracker = tracker;
+ // the tracker is used only to notify listeners of deletion of the sstable;
+ // since deletion of a non-final file is not really deletion of the sstable,
+ // we don't want to notify the listeners in this event
+ if (desc.type == Descriptor.Type.FINAL)
+ this.tracker = tracker;
}
public void schedule()
@@ -79,9 +90,6 @@ public class SSTableDeletingTask implements Runnable
if (tracker != null)
tracker.notifyDeleting(referent);
- if (referent.readMeter != null)
- SystemKeyspace.clearSSTableReadMeter(referent.getKeyspaceName(), referent.getColumnFamilyName(), referent.descriptor.generation);
-
// If we can't successfully delete the DATA component, set the task to be retried later: see above
File datafile = new File(desc.filenameFor(Component.DATA));
if (!datafile.delete())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 06f71d8..cd23ae2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -209,8 +209,8 @@ public class SSTableLoader implements StreamEventHandler
{
for (SSTableReader sstable : sstables)
{
- sstable.sharedRef().release();
- assert sstable.sharedRef().globalCount() == 0;
+ sstable.selfRef().release();
+ assert sstable.selfRef().globalCount() == 0;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index f34939a..a28eb44 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -36,11 +36,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -118,10 +114,62 @@ import org.apache.cassandra.utils.concurrent.RefCounted;
import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
/**
- * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
- * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
+ * An SSTableReader can be constructed in a number of places, but typically is either
+ * read from disk at startup, or constructed from a flushed memtable, or after compaction
+ * to replace some existing sstables. However once created, an sstablereader may also be modified.
+ *
+ * A reader's OpenReason describes its current stage in its lifecycle, as follows:
+ *
+ * NORMAL
+ * From: None => Reader has been read from disk, either at startup or from a flushed memtable
+ * EARLY => Reader is the final result of a compaction
+ * MOVED_START => Reader WAS being compacted, but this failed and it has been restored to NORMAL status
+ *
+ * EARLY
+ * From: None => Reader is a compaction replacement that is either incomplete and has been opened
+ * to represent its partial result status, or has been finished but the compaction
+ * it is a part of has not yet completed fully
+ * EARLY => Same as from None, only it is not the first time it has been
+ *
+ * MOVED_START
+ * From: NORMAL => Reader is being compacted. This compaction has not finished, but the compaction result
+ * is either partially or fully opened, to either partially or fully replace this reader.
+ * This reader's start key has been updated to represent this, so that reads only hit
+ * one or the other reader.
+ *
+ * METADATA_CHANGE
+ * From: NORMAL => Reader has seen low traffic and the amount of memory available for index summaries is
+ * constrained, so its index summary has been downsampled.
+ * METADATA_CHANGE => Same
+ *
+ * Note that in parallel to this, there are two different Descriptor types; TMPLINK and FINAL; the latter corresponds
+ * to NORMAL state readers and all readers that replace a NORMAL one. TMPLINK is used for EARLY state readers and
+ * no others.
+ *
+ * When a reader is being compacted, if the result is large its replacement may be opened as EARLY before compaction
+ * completes in order to present the result to consumers earlier. In this case the reader will itself be changed to
+ * a MOVED_START state, where its start no longer represents its on-disk minimum key. This is to permit reads to be
+ * directed to only one reader when the two represent the same data. The EARLY file can represent a compaction result
+ * that is either partially complete and still in-progress, or a complete and immutable sstable that is part of a larger
+ * macro compaction action that has not yet fully completed.
+ *
+ * Currently ALL compaction results at least briefly go through an EARLY open state prior to completion, regardless
+ * of if early opening is enabled.
+ *
+ * Since a reader can be created multiple times over the same shared underlying resources, and the exact resources
+ * it shares between each instance differ subtly, we track the lifetime of any underlying resource with its own
+ * reference count, which each instance takes a Ref to. Each instance then tracks references to itself, and once these
+ * all expire it releases its Refs to these underlying resources.
+ *
+ * There is some shared cleanup behaviour needed only once all sstablereaders in a certain stage of their lifecycle
+ * (i.e. EARLY or NORMAL opening), and some that must only occur once all readers of any kind over a single logical
+ * sstable have expired. These are managed by the TypeTidy and GlobalTidy classes at the bottom, and are effectively
+ * managed as another resource each instance tracks its own Ref instance to, to ensure all of these resources are
+ * cleaned up safely and can be debugged otherwise.
+ *
+ * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies
*/
-public class SSTableReader extends SSTable implements RefCounted
+public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
{
private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
@@ -166,7 +214,8 @@ public class SSTableReader extends SSTable implements RefCounted
{
NORMAL,
EARLY,
- METADATA_CHANGE
+ METADATA_CHANGE,
+ MOVED_START
}
public final OpenReason openReason;
@@ -174,7 +223,6 @@ public class SSTableReader extends SSTable implements RefCounted
// indexfile and datafile: might be null before a call to load()
private SegmentedFile ifile;
private SegmentedFile dfile;
-
private IndexSummary indexSummary;
private IFilter bf;
@@ -184,8 +232,7 @@ public class SSTableReader extends SSTable implements RefCounted
// technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
// but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
- private final AtomicBoolean isCompacted = new AtomicBoolean(false);
- private final AtomicBoolean isSuspect = new AtomicBoolean(false);
+ private AtomicBoolean isSuspect = new AtomicBoolean(false);
// not final since we need to be able to change level on a file.
private volatile StatsMetadata sstableMetadata;
@@ -193,12 +240,10 @@ public class SSTableReader extends SSTable implements RefCounted
private final AtomicLong keyCacheHit = new AtomicLong(0);
private final AtomicLong keyCacheRequest = new AtomicLong(0);
- private final Tidier tidy = new Tidier();
- private final RefCounted refCounted = RefCounted.Impl.get(tidy);
+ private final InstanceTidier tidy = new InstanceTidier(descriptor, metadata);
+ private final Ref<SSTableReader> selfRef = new Ref<>(this, tidy);
- @VisibleForTesting
- public RestorableMeter readMeter;
- private ScheduledFuture readMeterSyncFuture;
+ private RestorableMeter readMeter;
/**
* Calculate approximate key count.
@@ -399,7 +444,7 @@ public class SSTableReader extends SSTable implements RefCounted
sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
sstable.bf = FilterFactory.AlwaysPresent;
- sstable.tidy.setup(sstable);
+ sstable.setup();
return sstable;
}
@@ -443,13 +488,13 @@ public class SSTableReader extends SSTable implements RefCounted
sstable.load(validationMetadata);
logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ sstable.setup();
if (validate)
sstable.validate();
if (sstable.getKeyCache() != null)
logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
- sstable.tidy.setup(sstable);
return sstable;
}
@@ -545,32 +590,6 @@ public class SSTableReader extends SSTable implements RefCounted
this.sstableMetadata = sstableMetadata;
this.maxDataAge = maxDataAge;
this.openReason = openReason;
-
- tidy.deletingTask = new SSTableDeletingTask(this);
-
- // Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
- // the read meter when in client mode. Also don't track reads for special operations (like early open)
- // this is to avoid overflowing the executor queue (see CASSANDRA-8066)
- if (Keyspace.SYSTEM_KS.equals(desc.ksname) || Config.isClientMode() || openReason != OpenReason.NORMAL)
- {
- readMeter = null;
- readMeterSyncFuture = null;
- return;
- }
-
- readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
- // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
- readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
- {
- public void run()
- {
- if (!isCompacted.get())
- {
- meterSyncThrottle.acquire();
- SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
- }
- }
- }, 1, 5, TimeUnit.MINUTES);
}
private SSTableReader(Descriptor desc,
@@ -586,12 +605,11 @@ public class SSTableReader extends SSTable implements RefCounted
OpenReason openReason)
{
this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
-
this.ifile = ifile;
this.dfile = dfile;
this.indexSummary = indexSummary;
this.bf = bloomFilter;
- tidy.setup(this);
+ this.setup();
}
public static long getTotalBytes(Iterable<SSTableReader> sstables)
@@ -626,7 +644,7 @@ public class SSTableReader extends SSTable implements RefCounted
public void setTrackedBy(DataTracker tracker)
{
- tidy.deletingTask.setTracker(tracker);
+ tidy.type.deletingTask.setTracker(tracker);
// under normal operation we can do this at any time, but SSTR is also used outside C* proper,
// e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache
// here when we know we're being wired into the rest of the server infrastructure.
@@ -698,7 +716,6 @@ public class SSTableReader extends SSTable implements RefCounted
dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
saveSummary(ibuilder, dbuilder);
- tidy.setup(this);
}
/**
@@ -792,6 +809,8 @@ public class SSTableReader extends SSTable implements RefCounted
}
catch (IOException e)
{
+ if (indexSummary != null)
+ indexSummary.close();
logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
// corrupted; delete it and fall back to creating a new summary
FileUtils.closeQuietly(iStream);
@@ -850,20 +869,21 @@ public class SSTableReader extends SSTable implements RefCounted
public void setReplacedBy(SSTableReader replacement)
{
- synchronized (tidy.replaceLock)
+ synchronized (tidy.global)
{
- assert tidy.replacedBy == null;
- tidy.replacedBy = replacement;
- replacement.tidy.replaces = this;
- replacement.tidy.replaceLock = tidy.replaceLock;
+ assert replacement != null;
+ assert !tidy.isReplaced;
+ assert tidy.global.live == this;
+ tidy.isReplaced = true;
+ tidy.global.live = replacement;
}
}
public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
{
- synchronized (tidy.replaceLock)
+ synchronized (tidy.global)
{
- assert tidy.replacedBy == null;
+ assert openReason != OpenReason.EARLY;
if (newStart.compareTo(this.first) > 0)
{
@@ -895,10 +915,9 @@ public class SSTableReader extends SSTable implements RefCounted
}
}
- SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata,
- openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
- replacement.readMeterSyncFuture = this.readMeterSyncFuture;
- replacement.readMeter = this.readMeter;
+ SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
+ dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
+ maxDataAge, sstableMetadata, OpenReason.MOVED_START);
replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
replacement.last = this.last;
setReplacedBy(replacement);
@@ -916,9 +935,9 @@ public class SSTableReader extends SSTable implements RefCounted
*/
public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
{
- synchronized (tidy.replaceLock)
+ synchronized (tidy.global)
{
- assert tidy.replacedBy == null;
+ assert openReason != OpenReason.EARLY;
int minIndexInterval = metadata.getMinIndexInterval();
int maxIndexInterval = metadata.getMaxIndexInterval();
@@ -957,10 +976,9 @@ public class SSTableReader extends SSTable implements RefCounted
StorageMetrics.load.inc(newSize - oldSize);
parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
- SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata,
- openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
- replacement.readMeterSyncFuture = this.readMeterSyncFuture;
- replacement.readMeter = this.readMeter;
+ SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
+ dfile.sharedCopy(), newSummary, bf.sharedCopy(), maxDataAge,
+ sstableMetadata, OpenReason.METADATA_CHANGE);
replacement.first = this.first;
replacement.last = this.last;
setReplacedBy(replacement);
@@ -992,6 +1010,11 @@ public class SSTableReader extends SSTable implements RefCounted
}
}
+ public RestorableMeter getReadMeter()
+ {
+ return readMeter;
+ }
+
public int getIndexSummarySamplingLevel()
{
return indexSummary.getSamplingLevel();
@@ -1014,14 +1037,17 @@ public class SSTableReader extends SSTable implements RefCounted
public void releaseSummary() throws IOException
{
- indexSummary.close();
+ tidy.releaseSummary();
indexSummary = null;
}
private void validate()
{
if (this.first.compareTo(this.last) > 0)
+ {
+ selfRef().release();
throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
+ }
}
/**
@@ -1567,16 +1593,16 @@ public class SSTableReader extends SSTable implements RefCounted
if (logger.isDebugEnabled())
logger.debug("Marking {} compacted", getFilename());
- synchronized (tidy.replaceLock)
+ synchronized (tidy.global)
{
- assert tidy.replacedBy == null : getFilename();
+ assert !tidy.isReplaced;
}
- return !isCompacted.getAndSet(true);
+ return !tidy.global.isCompacted.getAndSet(true);
}
public boolean isMarkedCompacted()
{
- return isCompacted.get();
+ return tidy.global.isCompacted.get();
}
public void markSuspect()
@@ -1673,16 +1699,7 @@ public class SSTableReader extends SSTable implements RefCounted
public SSTableReader getCurrentReplacement()
{
- synchronized (tidy.replaceLock)
- {
- SSTableReader cur = this, next = tidy.replacedBy;
- while (next != null)
- {
- cur = next;
- next = next.tidy.replacedBy;
- }
- return cur;
- }
+ return tidy.global.live;
}
/**
@@ -1882,199 +1899,314 @@ public class SSTableReader extends SSTable implements RefCounted
}
}
- public Ref tryRef()
+ public Ref<SSTableReader> tryRef()
{
- return refCounted.tryRef();
+ return selfRef.tryRef();
}
- public Ref sharedRef()
+ public Ref<SSTableReader> selfRef()
{
- return refCounted.sharedRef();
+ return selfRef;
}
- private static final class Tidier implements Tidy
+ public Ref<SSTableReader> ref()
{
- private String name;
- private CFMetaData metadata;
- // indexfile and datafile: might be null before a call to load()
- private SegmentedFile ifile;
- private SegmentedFile dfile;
+ return selfRef.ref();
+ }
- private IndexSummary indexSummary;
- private IFilter bf;
+ void setup()
+ {
+ tidy.setup(this);
+ this.readMeter = tidy.global.readMeter;
+ }
+
+ @VisibleForTesting
+ public void overrideReadMeter(RestorableMeter readMeter)
+ {
+ this.readMeter = tidy.global.readMeter = readMeter;
+ }
- private AtomicBoolean isCompacted;
+ /**
+ * One instance per SSTableReader we create. This references the type-shared tidy, which in turn references
+ * the globally shared tidy, i.e.
+ *
+ * InstanceTidier => DescriptorTypeTitdy => GlobalTidy
+ *
+ * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example), but there can only be
+ * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy for one single logical sstable.
+ *
+ * When the InstanceTidier cleansup, it releases its reference to its DescriptorTypeTidy; when all InstanceTidiers
+ * for that type have run, the DescriptorTypeTidy cleansup. DescriptorTypeTidy behaves in the same way towards GlobalTidy.
+ *
+ * For ease, we stash a direct reference to both our type-shared and global tidier
+ */
+ private static final class InstanceTidier implements Tidy
+ {
+ private final Descriptor descriptor;
+ private final CFMetaData metadata;
+ private IFilter bf;
+ private IndexSummary summary;
- /**
- * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
- * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
- * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
- * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
- */
- private Object replaceLock = new Object();
- private SSTableReader replacedBy;
- private SSTableReader replaces;
- private SSTableDeletingTask deletingTask;
+ private SegmentedFile dfile;
+ private SegmentedFile ifile;
private Runnable runOnClose;
+ private boolean isReplaced = false;
+
+ // a reference to our shared per-Descriptor.Type tidy instance, that
+ // we will release when we are ourselves released
+ private Ref<DescriptorTypeTidy> typeRef;
+
+ // a convenience stashing of the shared per-descriptor-type tidy instance itself
+ // and the per-logical-sstable globally shared state that it is linked to
+ private DescriptorTypeTidy type;
+ private GlobalTidy global;
+
+ private boolean setup;
+
+ void setup(SSTableReader reader)
+ {
+ this.setup = true;
+ this.bf = reader.bf;
+ this.summary = reader.indexSummary;
+ this.dfile = reader.dfile;
+ this.ifile = reader.ifile;
+ // get a new reference to the shared descriptor-type tidy
+ this.typeRef = DescriptorTypeTidy.get(reader);
+ this.type = typeRef.get();
+ this.global = type.globalRef.get();
+ }
- @VisibleForTesting
- public RestorableMeter readMeter;
- private volatile ScheduledFuture readMeterSyncFuture;
+ InstanceTidier(Descriptor descriptor, CFMetaData metadata)
+ {
+ this.descriptor = descriptor;
+ this.metadata = metadata;
+ }
- private void setup(SSTableReader reader)
+ public void tidy()
{
- name = reader.toString();
- metadata = reader.metadata;
- ifile = reader.ifile;
- dfile = reader.dfile;
- indexSummary = reader.indexSummary;
- bf = reader.bf;
- isCompacted = reader.isCompacted;
- readMeterSyncFuture = reader.readMeterSyncFuture;
+ // don't try to cleanup if the sstablereader was never fully constructed
+ if (!setup)
+ return;
+
+ final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+ final OpOrder.Barrier barrier;
+ if (cfs != null)
+ {
+ barrier = cfs.readOrdering.newBarrier();
+ barrier.issue();
+ }
+ else
+ barrier = null;
+
+ ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
+ {
+ public void run()
+ {
+ if (barrier != null)
+ barrier.await();
+ bf.close();
+ dfile.close();
+ ifile.close();
+ if (summary != null)
+ summary.close();
+ if (runOnClose != null)
+ runOnClose.run();
+ typeRef.release();
+ }
+ });
}
public String name()
{
- return name;
+ return descriptor.toString();
}
- private void dropPageCache()
+ void releaseSummary()
{
- dropPageCache(dfile.path);
- dropPageCache(ifile.path);
+ summary.close();
+ assert summary.isCleanedUp();
+ summary = null;
}
+ }
+
+ /**
+ * One shared between all instances of a given Descriptor.Type.
+ * Performs only two things: the deletion of the sstables for the type,
+ * if necessary; and the shared reference to the globally shared state.
+ *
+ * All InstanceTidiers, on setup(), ask the static get() method for their shared state,
+ * and stash a reference to it to be released when they are. Once all such references are
+ * released, the shared tidy will be performed.
+ */
+ static final class DescriptorTypeTidy implements Tidy
+ {
+ // keyed by REAL descriptor (TMPLINK/FINAL), mapping to the shared DescriptorTypeTidy for that descriptor
+ static final ConcurrentMap<Descriptor, Ref<DescriptorTypeTidy>> lookup = new ConcurrentHashMap<>();
- private void dropPageCache(String filePath)
+ private final Descriptor desc;
+ private final Ref<GlobalTidy> globalRef;
+ private final SSTableDeletingTask deletingTask;
+
+ DescriptorTypeTidy(Descriptor desc, SSTableReader sstable)
{
- RandomAccessFile file = null;
+ this.desc = desc;
+ this.deletingTask = new SSTableDeletingTask(desc, sstable);
+ // get a new reference to the shared global tidy
+ this.globalRef = GlobalTidy.get(sstable);
+ }
- try
+ public void tidy()
+ {
+ lookup.remove(desc);
+ boolean isCompacted = globalRef.get().isCompacted.get();
+ globalRef.release();
+ switch (desc.type)
{
- file = new RandomAccessFile(filePath, "r");
+ case FINAL:
+ if (isCompacted)
+ deletingTask.run();
+ break;
+ case TEMPLINK:
+ deletingTask.run();
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
- int fd = CLibrary.getfd(file.getFD());
+ public String name()
+ {
+ return desc.toString();
+ }
- if (fd > 0)
- {
- if (logger.isDebugEnabled())
- logger.debug(String.format("Dropping page cache of file %s.", filePath));
+ // get a new reference to the shared DescriptorTypeTidy for this sstable
+ public static Ref<DescriptorTypeTidy> get(SSTableReader sstable)
+ {
+ Descriptor desc = sstable.descriptor;
+ if (sstable.openReason == OpenReason.EARLY)
+ desc = desc.asType(Descriptor.Type.TEMPLINK);
+ Ref<DescriptorTypeTidy> refc = lookup.get(desc);
+ if (refc != null)
+ return refc.ref();
+ final DescriptorTypeTidy tidy = new DescriptorTypeTidy(desc, sstable);
+ refc = new Ref<>(tidy, tidy);
+ Ref<?> ex = lookup.putIfAbsent(desc, refc);
+ assert ex == null;
+ return refc;
+ }
+ }
- CLibrary.trySkipCache(fd, 0, 0);
- }
- }
- catch (IOException e)
+ /**
+ * One instance per logical sstable. This both tracks shared cleanup and some shared state related
+ * to the sstable's lifecycle. All DescriptorTypeTidy instances, on construction, obtain a reference to us
+ * via our static get(). There should only ever be at most two such references extant at any one time,
+ * since only TMPLINK and FINAL type descriptors should be open as readers. When all files of both
+ * kinds have been released, this shared tidy will be performed.
+ */
+ static final class GlobalTidy implements Tidy
+ {
+ // keyed by FINAL descriptor, mapping to the shared GlobalTidy for that descriptor
+ static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>();
+
+ private final Descriptor desc;
+ // a single convenience property for getting the most recent version of an sstable, not related to tidying
+ private SSTableReader live;
+ // the readMeter that is shared between all instances of the sstable, and can be overridden in all of them
+ // at once also, for testing purposes
+ private RestorableMeter readMeter;
+ // the scheduled persistence of the readMeter, that we will cancel once all instances of this logical
+ // sstable have been released
+ private final ScheduledFuture readMeterSyncFuture;
+ // shared state managing if the logical sstable has been compacted; this is used in cleanup both here
+ // and in the FINAL type tidier
+ private final AtomicBoolean isCompacted;
+
+ GlobalTidy(final SSTableReader reader)
+ {
+ this.desc = reader.descriptor;
+ this.isCompacted = new AtomicBoolean();
+ this.live = reader;
+ // Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
+ // the read meter when in client mode.
+ if (Keyspace.SYSTEM_KS.equals(desc.ksname) || Config.isClientMode())
{
- // we don't care if cache cleanup fails
+ readMeter = null;
+ readMeterSyncFuture = null;
+ return;
}
- finally
+
+ readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+ // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
+ readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
{
- FileUtils.closeQuietly(file);
- }
+ public void run()
+ {
+ if (!isCompacted.get())
+ {
+ meterSyncThrottle.acquire();
+ SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
+ }
+ }
+ }, 1, 5, TimeUnit.MINUTES);
}
public void tidy()
{
+ lookup.remove(desc);
if (readMeterSyncFuture != null)
- readMeterSyncFuture.cancel(false);
+ readMeterSyncFuture.cancel(true);
+ if (isCompacted.get())
+ SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+ // don't ideally want to dropPageCache for the file until all instances have been released
+ dropPageCache(desc.filenameFor(Component.DATA));
+ dropPageCache(desc.filenameFor(Component.PRIMARY_INDEX));
+ }
- synchronized (replaceLock)
- {
- boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
+ public String name()
+ {
+ return desc.toString();
+ }
- if (replacedBy != null)
- {
- closeBf = replacedBy.bf != bf;
- closeSummary = replacedBy.indexSummary != indexSummary;
- closeFiles = replacedBy.dfile != dfile;
- // if the replacement sstablereader uses a different path, clean up our paths
- deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
- }
+ // get a new reference to the shared GlobalTidy for this sstable
+ public static Ref<GlobalTidy> get(SSTableReader sstable)
+ {
+ Descriptor descriptor = sstable.descriptor;
+ Ref<GlobalTidy> refc = lookup.get(descriptor);
+ if (refc != null)
+ return refc.ref();
+ final GlobalTidy tidy = new GlobalTidy(sstable);
+ refc = new Ref<>(tidy, tidy);
+ Ref<?> ex = lookup.putIfAbsent(descriptor, refc);
+ assert ex == null;
+ return refc;
+ }
+ }
- if (replaces != null)
- {
- closeBf &= replaces.bf != bf;
- closeSummary &= replaces.indexSummary != indexSummary;
- closeFiles &= replaces.dfile != dfile;
- deleteFiles &= !dfile.path.equals(replaces.dfile.path);
- }
+ private static void dropPageCache(String filePath)
+ {
+ RandomAccessFile file = null;
- boolean deleteAll = false;
- if (isCompacted.get())
- {
- assert replacedBy == null;
- if (replaces != null && !deleteFiles)
- {
- replaces.tidy.replacedBy = null;
- replaces.tidy.deletingTask = deletingTask;
- replaces.markObsolete();
- }
- else
- {
- deleteAll = true;
- }
- }
- else
- {
- closeSummary &= indexSummary != null;
- if (replaces != null)
- replaces.tidy.replacedBy = replacedBy;
- if (replacedBy != null)
- replacedBy.tidy.replaces = replaces;
- }
+ try
+ {
+ file = new RandomAccessFile(filePath, "r");
- scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
- }
- }
+ int fd = CLibrary.getfd(file.getFD());
- private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
- {
- final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
- final OpOrder.Barrier barrier;
- if (cfs != null)
+ if (fd > 0)
{
- barrier = cfs.readOrdering.newBarrier();
- barrier.issue();
- }
- else
- barrier = null;
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Dropping page cache of file %s.", filePath));
- ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
- {
- public void run()
- {
- if (barrier != null)
- barrier.await();
- if (closeBf)
- bf.close();
- if (closeSummary)
- indexSummary.close();
- if (closeFiles)
- {
- ifile.cleanup();
- dfile.cleanup();
- }
- if (runOnClose != null)
- runOnClose.run();
- if (deleteAll)
- {
- /**
- * Do the OS a favour and suggest (using fadvice call) that we
- * don't want to see pages of this SSTable in memory anymore.
- *
- * NOTE: We can't use madvice in java because it requires the address of
- * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
- */
- dropPageCache();
- deletingTask.run();
- }
- else if (deleteFiles)
- {
- FileUtils.deleteWithConfirm(new File(dfile.path));
- FileUtils.deleteWithConfirm(new File(ifile.path));
- }
- }
- });
+ CLibrary.trySkipCache(fd, 0, 0);
+ }
+ }
+ catch (IOException e)
+ {
+ // we don't care if cache cleanup fails
+ }
+ finally
+ {
+ FileUtils.closeQuietly(file);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 7784b18..6356d4d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -183,14 +183,14 @@ public class SSTableRewriter
for (SSTableReader sstable : finished)
{
sstable.markObsolete();
- sstable.sharedRef().release();
+ sstable.selfRef().release();
}
// abort the writers
for (Finished finished : finishedEarly)
{
boolean opened = finished.reader != null;
- finished.writer.abort(!opened);
+ finished.writer.abort();
if (opened)
{
// if we've already been opened, add ourselves to the discard pile
@@ -361,7 +361,7 @@ public class SSTableRewriter
}
else
{
- f.writer.abort(true);
+ f.writer.abort();
assert f.reader == null;
}
}
@@ -380,9 +380,9 @@ public class SSTableRewriter
{
for (SSTableReader reader : discard)
{
- if (reader.getCurrentReplacement() == null)
+ if (reader.getCurrentReplacement() == reader)
reader.markObsolete();
- reader.sharedRef().release();
+ reader.selfRef().release();
}
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index cc60594..d430314 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.io.sstable;
-import java.io.Closeable;
import java.io.DataInput;
import java.io.File;
import java.io.FileOutputStream;
@@ -339,16 +338,12 @@ public class SSTableWriter extends SSTable
*/
public void abort()
{
- abort(true);
- }
- public void abort(boolean closeBf)
- {
assert descriptor.type.isTemporary;
if (iwriter == null && dataFile == null)
return;
if (iwriter != null)
- iwriter.abort(closeBf);
+ iwriter.abort();
if (dataFile!= null)
dataFile.abort();
@@ -407,7 +402,7 @@ public class SSTableWriter extends SSTable
components, metadata,
partitioner, ifile,
dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
- iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
+ iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
// now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
sstable.first = getMinimalKey(first);
@@ -416,7 +411,7 @@ public class SSTableWriter extends SSTable
if (inclusiveUpperBoundOfReadableData == null)
{
// Prevent leaving tmplink files on disk
- sstable.sharedRef().release();
+ sstable.selfRef().release();
return null;
}
int offset = 2;
@@ -428,7 +423,7 @@ public class SSTableWriter extends SSTable
inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
if (inclusiveUpperBoundOfReadableData == null)
{
- sstable.sharedRef().release();
+ sstable.selfRef().release();
return null;
}
}
@@ -438,14 +433,17 @@ public class SSTableWriter extends SSTable
public static enum FinishType
{
- NORMAL(SSTableReader.OpenReason.NORMAL),
- EARLY(SSTableReader.OpenReason.EARLY), // no renaming
- FINISH_EARLY(SSTableReader.OpenReason.NORMAL); // tidy up an EARLY finish
+ CLOSE(null, true),
+ NORMAL(SSTableReader.OpenReason.NORMAL, true),
+ EARLY(SSTableReader.OpenReason.EARLY, false), // no renaming
+ FINISH_EARLY(SSTableReader.OpenReason.NORMAL, true); // tidy up an EARLY finish
final SSTableReader.OpenReason openReason;
- FinishType(SSTableReader.OpenReason openReason)
+ public final boolean isFinal;
+ FinishType(SSTableReader.OpenReason openReason, boolean isFinal)
{
this.openReason = openReason;
+ this.isFinal = isFinal;
}
}
@@ -461,6 +459,7 @@ public class SSTableWriter extends SSTable
public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
{
+ assert finishType != FinishType.CLOSE;
Pair<Descriptor, StatsMetadata> p;
p = close(finishType, repairedAt < 0 ? this.repairedAt : repairedAt);
@@ -480,16 +479,16 @@ public class SSTableWriter extends SSTable
ifile,
dfile,
iwriter.summary.build(partitioner),
- iwriter.bf,
+ iwriter.bf.sharedCopy(),
maxDataAge,
metadata,
finishType.openReason);
sstable.first = getMinimalKey(first);
sstable.last = getMinimalKey(last);
- switch (finishType)
+ if (finishType.isFinal)
{
- case NORMAL: case FINISH_EARLY:
+ iwriter.bf.close();
// try to save the summaries to disk
sstable.saveSummary(iwriter.builder, dbuilder);
iwriter = null;
@@ -501,16 +500,18 @@ public class SSTableWriter extends SSTable
// Close the writer and return the descriptor to the new sstable and it's metadata
public Pair<Descriptor, StatsMetadata> close()
{
- return close(FinishType.NORMAL, this.repairedAt);
+ return close(FinishType.CLOSE, this.repairedAt);
}
private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
{
switch (type)
{
- case EARLY: case NORMAL:
+ case EARLY: case CLOSE: case NORMAL:
iwriter.close();
dataFile.close();
+ if (type == FinishType.CLOSE)
+ iwriter.bf.close();
}
// write sstable statistics
@@ -521,9 +522,8 @@ public class SSTableWriter extends SSTable
// remove the 'tmp' marker from all components
Descriptor descriptor = this.descriptor;
- switch (type)
+ if (type.isFinal)
{
- case NORMAL: case FINISH_EARLY:
dataFile.writeFullChecksum(descriptor);
writeMetadata(descriptor, metadataComponents);
// save the table of components
@@ -629,11 +629,10 @@ public class SSTableWriter extends SSTable
builder.addPotentialBoundary(indexPosition);
}
- public void abort(boolean closeBf)
+ public void abort()
{
indexFile.abort();
- if (closeBf)
- bf.close();
+ bf.close();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index 57f465f..8334965 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -25,7 +25,17 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
{
public BufferedPoolingSegmentedFile(String path, long length)
{
- super(path, length);
+ super(new Cleanup(path), path, length);
+ }
+
+ private BufferedPoolingSegmentedFile(BufferedPoolingSegmentedFile copy)
+ {
+ super(copy);
+ }
+
+ public BufferedPoolingSegmentedFile sharedCopy()
+ {
+ return new BufferedPoolingSegmentedFile(this);
}
public static class Builder extends SegmentedFile.Builder
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 2f715da..c29bbf3 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -20,12 +20,30 @@ package org.apache.cassandra.io.util;
import java.io.File;
import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.utils.concurrent.SharedCloseable;
public class BufferedSegmentedFile extends SegmentedFile
{
public BufferedSegmentedFile(String path, long length)
{
- super(path, length);
+ super(new Cleanup(path), path, length);
+ }
+
+ private BufferedSegmentedFile(BufferedSegmentedFile copy)
+ {
+ super(copy);
+ }
+
+ private static class Cleanup extends SegmentedFile.Cleanup
+ {
+ protected Cleanup(String path)
+ {
+ super(path);
+ }
+ public void tidy() throws Exception
+ {
+
+ }
}
public static class Builder extends SegmentedFile.Builder
@@ -49,7 +67,8 @@ public class BufferedSegmentedFile extends SegmentedFile
return reader;
}
- public void cleanup()
+ public BufferedSegmentedFile sharedCopy()
{
+ return new BufferedSegmentedFile(this);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 11d091a..94d23bf 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -28,10 +28,31 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
public CompressedPoolingSegmentedFile(String path, CompressionMetadata metadata)
{
- super(path, metadata.dataLength, metadata.compressedFileLength);
+ super(new Cleanup(path, metadata), path, metadata.dataLength, metadata.compressedFileLength);
this.metadata = metadata;
}
+ private CompressedPoolingSegmentedFile(CompressedPoolingSegmentedFile copy)
+ {
+ super(copy);
+ this.metadata = copy.metadata;
+ }
+
+ protected static final class Cleanup extends PoolingSegmentedFile.Cleanup
+ {
+ final CompressionMetadata metadata;
+ protected Cleanup(String path, CompressionMetadata metadata)
+ {
+ super(path);
+ this.metadata = metadata;
+ }
+ public void tidy() throws Exception
+ {
+ super.tidy();
+ metadata.close();
+ }
+ }
+
public static class Builder extends CompressedSegmentedFile.Builder
{
public Builder(CompressedSequentialWriter writer)
@@ -59,10 +80,8 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
return metadata;
}
- @Override
- public void cleanup()
+ public CompressedPoolingSegmentedFile sharedCopy()
{
- super.cleanup();
- metadata.close();
+ return new CompressedPoolingSegmentedFile(this);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index b788715..0c20bb9 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -28,10 +28,35 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
public CompressedSegmentedFile(String path, CompressionMetadata metadata)
{
- super(path, metadata.dataLength, metadata.compressedFileLength);
+ super(new Cleanup(path, metadata), path, metadata.dataLength, metadata.compressedFileLength);
this.metadata = metadata;
}
+ private CompressedSegmentedFile(CompressedSegmentedFile copy)
+ {
+ super(copy);
+ this.metadata = copy.metadata;
+ }
+
+ private static final class Cleanup extends SegmentedFile.Cleanup
+ {
+ final CompressionMetadata metadata;
+ protected Cleanup(String path, CompressionMetadata metadata)
+ {
+ super(path);
+ this.metadata = metadata;
+ }
+ public void tidy() throws Exception
+ {
+ metadata.close();
+ }
+ }
+
+ public CompressedSegmentedFile sharedCopy()
+ {
+ return new CompressedSegmentedFile(this);
+ }
+
public static class Builder extends SegmentedFile.Builder
{
protected final CompressedSequentialWriter writer;
@@ -70,9 +95,4 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
{
return metadata;
}
-
- public void cleanup()
- {
- metadata.close();
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index 5306433..3874669 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.io.util;
+import java.io.Closeable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -30,7 +31,7 @@ import sun.nio.ch.DirectBuffer;
/**
* An off-heap region of memory that must be manually free'd when no longer needed.
*/
-public class Memory
+public class Memory implements AutoCloseable
{
private static final Unsafe unsafe = NativeAllocator.unsafe;
private static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
@@ -302,6 +303,11 @@ public class Memory
peer = 0;
}
+ public void close() throws Exception
+ {
+ free();
+ }
+
public long size()
{
assert peer != 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 3b2cc98..8b4ae9d 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -47,10 +47,21 @@ public class MmappedSegmentedFile extends SegmentedFile
public MmappedSegmentedFile(String path, long length, Segment[] segments)
{
- super(path, length);
+ super(new Cleanup(path, segments), path, length);
this.segments = segments;
}
+ private MmappedSegmentedFile(MmappedSegmentedFile copy)
+ {
+ super(copy);
+ this.segments = copy.segments;
+ }
+
+ public MmappedSegmentedFile sharedCopy()
+ {
+ return new MmappedSegmentedFile(this);
+ }
+
/**
* @return The segment entry for the given position.
*/
@@ -85,31 +96,41 @@ public class MmappedSegmentedFile extends SegmentedFile
return file;
}
- public void cleanup()
+ private static final class Cleanup extends SegmentedFile.Cleanup
{
- if (!FileUtils.isCleanerAvailable())
- return;
+ final Segment[] segments;
+ protected Cleanup(String path, Segment[] segments)
+ {
+ super(path);
+ this.segments = segments;
+ }
+
+ public void tidy()
+ {
+ if (!FileUtils.isCleanerAvailable())
+ return;
/*
* Try forcing the unmapping of segments using undocumented unsafe sun APIs.
* If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
* If this works and a thread tries to access any segment, hell will unleash on earth.
*/
- try
- {
- for (Segment segment : segments)
+ try
{
- if (segment.right == null)
- continue;
- FileUtils.clean(segment.right);
+ for (Segment segment : segments)
+ {
+ if (segment.right == null)
+ continue;
+ FileUtils.clean(segment.right);
+ }
+ logger.debug("All segments have been unmapped successfully");
+ }
+ catch (Exception e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ // This is not supposed to happen
+ logger.error("Error while unmapping segments", e);
}
- logger.debug("All segments have been unmapped successfully");
- }
- catch (Exception e)
- {
- JVMStabilityInspector.inspectThrowable(e);
- // This is not supposed to happen
- logger.error("Error while unmapping segments", e);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index 01f4e31..daca22f 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -21,15 +21,35 @@ import org.apache.cassandra.service.FileCacheService;
public abstract class PoolingSegmentedFile extends SegmentedFile
{
- final FileCacheService.CacheKey cacheKey = new FileCacheService.CacheKey();
- protected PoolingSegmentedFile(String path, long length)
+ final FileCacheService.CacheKey cacheKey;
+ protected PoolingSegmentedFile(Cleanup cleanup, String path, long length)
{
- super(path, length);
+ this(cleanup, path, length, length);
}
- protected PoolingSegmentedFile(String path, long length, long onDiskLength)
+ protected PoolingSegmentedFile(Cleanup cleanup, String path, long length, long onDiskLength)
{
- super(path, length, onDiskLength);
+ super(cleanup, path, length, onDiskLength);
+ cacheKey = cleanup.cacheKey;
+ }
+
+ public PoolingSegmentedFile(PoolingSegmentedFile copy)
+ {
+ super(copy);
+ cacheKey = copy.cacheKey;
+ }
+
+ protected static class Cleanup extends SegmentedFile.Cleanup
+ {
+ final FileCacheService.CacheKey cacheKey = new FileCacheService.CacheKey();
+ protected Cleanup(String path)
+ {
+ super(path);
+ }
+ public void tidy() throws Exception
+ {
+ FileCacheService.instance.invalidate(cacheKey, path);
+ }
}
public FileDataInput getSegment(long position)
@@ -49,9 +69,4 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
{
FileCacheService.instance.put(cacheKey, reader);
}
-
- public void cleanup()
- {
- FileCacheService.instance.invalidate(cacheKey, path);
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index badae56..d557b72 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -31,6 +31,8 @@ import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
/**
* Abstracts a read-only file that has been split into segments, each of which can be represented by an independent
@@ -41,7 +43,7 @@ import org.apache.cassandra.utils.Pair;
* would need to be longer than 2GB, that segment will not be mmap'd, and a new RandomAccessFile will be created for
* each access to that segment.
*/
-public abstract class SegmentedFile
+public abstract class SegmentedFile extends SharedCloseableImpl
{
public final String path;
public final long length;
@@ -53,18 +55,43 @@ public abstract class SegmentedFile
/**
* Use getBuilder to get a Builder to construct a SegmentedFile.
*/
- SegmentedFile(String path, long length)
+ SegmentedFile(Cleanup cleanup, String path, long length)
{
- this(path, length, length);
+ this(cleanup, path, length, length);
}
- protected SegmentedFile(String path, long length, long onDiskLength)
+ protected SegmentedFile(Cleanup cleanup, String path, long length, long onDiskLength)
{
+ super(cleanup);
this.path = new File(path).getAbsolutePath();
this.length = length;
this.onDiskLength = onDiskLength;
}
+ public SegmentedFile(SegmentedFile copy)
+ {
+ super(copy);
+ path = copy.path;
+ length = copy.length;
+ onDiskLength = copy.onDiskLength;
+ }
+
+ protected static abstract class Cleanup implements RefCounted.Tidy
+ {
+ final String path;
+ protected Cleanup(String path)
+ {
+ this.path = path;
+ }
+
+ public String name()
+ {
+ return path;
+ }
+ }
+
+ public abstract SegmentedFile sharedCopy();
+
/**
* @return A SegmentedFile.Builder.
*/
@@ -96,11 +123,6 @@ public abstract class SegmentedFile
}
/**
- * Do whatever action is needed to reclaim ressources used by this SegmentedFile.
- */
- public abstract void cleanup();
-
- /**
* Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it.
*/
public static abstract class Builder
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 15e7641..bf1cdd6 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -432,7 +432,7 @@ public class ActiveRepairService
{
Set<SSTableReader> sstables = sstableMap.get(cfId);
Iterator<SSTableReader> sstableIterator = sstables.iterator();
- ImmutableMap.Builder<SSTableReader, Ref> references = ImmutableMap.builder();
+ ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
while (sstableIterator.hasNext())
{
SSTableReader sstable = sstableIterator.next();
@@ -442,7 +442,7 @@ public class ActiveRepairService
}
else
{
- Ref ref = sstable.tryRef();
+ Ref<SSTableReader> ref = sstable.tryRef();
if (ref == null)
sstableIterator.remove();
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 6108dea..a9f5075 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -353,7 +353,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
public static class SSTableStreamingSections
{
public final SSTableReader sstable;
- public final Ref ref;
+ public final Ref<SSTableReader> ref;
public final List<Pair<Long, Long>> sections;
public final long estimatedKeys;
public final long repairedAt;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 5ebf289..069e97f 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -62,7 +62,7 @@ public class OutgoingFileMessage extends StreamMessage
public final FileMessageHeader header;
public final SSTableReader sstable;
- public final Ref ref;
+ public final Ref<SSTableReader> ref;
public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 1bc2674..d420218 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -121,7 +121,7 @@ public class StandaloneScrubber
// Remove the sstable (it's been copied by scrub and snapshotted)
sstable.markObsolete();
- sstable.sharedRef().release();
+ sstable.selfRef().release();
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index cc162d4..1a029e5 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -32,6 +32,11 @@ public class AlwaysPresentFilter implements IFilter
public void close() { }
+ public IFilter sharedCopy()
+ {
+ return this;
+ }
+
public long serializedSize() { return 0; }
@Override