You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/04/04 22:39:40 UTC
[1/2] git commit: Ensure safe resource cleanup when replacing SSTables
Repository: cassandra
Updated Branches:
refs/heads/trunk 0015f37a3 -> 64bc45849
Ensure safe resource cleanup when replacing SSTables
Patch by belliotsmith; reviewed by Tyler Hobbs for CASSANDRA-6912
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ebadc11
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ebadc11
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ebadc11
Branch: refs/heads/trunk
Commit: 5ebadc11e36749e6479f9aba19406db3aacdaf41
Parents: 57b18e6
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Fri Apr 4 15:37:09 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Apr 4 15:37:09 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 28 +-
.../cassandra/io/sstable/IndexSummary.java | 2 +-
.../io/sstable/IndexSummaryManager.java | 22 +-
.../cassandra/io/sstable/SSTableReader.java | 318 +++++++++++++------
.../cassandra/utils/AlwaysPresentFilter.java | 3 +-
.../org/apache/cassandra/utils/BloomFilter.java | 3 +-
.../org/apache/cassandra/utils/IFilter.java | 2 +
.../org/apache/cassandra/utils/obs/IBitSet.java | 2 +
.../cassandra/utils/obs/OffHeapBitSet.java | 2 +-
.../apache/cassandra/utils/obs/OpenBitSet.java | 2 +-
.../io/sstable/IndexSummaryManagerTest.java | 2 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 28 +-
13 files changed, 278 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4cfc957..0f1ae93 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,7 @@
* Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
* Lock counter cells, not partitions (CASSANDRA-6880)
* Track presence of legacy counter shards in sstables (CASSANDRA-6888)
+ * Ensure safe resource cleanup when replacing sstables (CASSANDRA-6912)
Merged from 2.0:
* Allow compaction of system tables during startup (CASSANDRA-6913)
* Restrict Windows to parallel repairs (CASSANDRA-6907)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index c8fc699..9c8f9a0 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -192,14 +192,17 @@ public class DataTracker
public boolean markCompacting(Iterable<SSTableReader> sstables)
{
assert sstables != null && !Iterables.isEmpty(sstables);
+ while (true)
+ {
+ View currentView = view.get();
+ Set<SSTableReader> inactive = Sets.difference(ImmutableSet.copyOf(sstables), currentView.compacting);
+ if (inactive.size() < Iterables.size(sstables))
+ return false;
- View currentView = view.get();
- Set<SSTableReader> inactive = Sets.difference(ImmutableSet.copyOf(sstables), currentView.compacting);
- if (inactive.size() < Iterables.size(sstables))
- return false;
-
- View newView = currentView.markCompacting(inactive);
- return view.compareAndSet(currentView, newView);
+ View newView = currentView.markCompacting(inactive);
+ if (view.compareAndSet(currentView, newView))
+ return true;
+ }
}
/**
@@ -333,14 +336,6 @@ public class DataTracker
*/
public void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables)
{
- // data component will be unchanged but the index summary will be a different size
- // (since we save that to make restart fast)
- long sizeIncrease = 0;
- for (SSTableReader sstable : oldSSTables)
- sizeIncrease -= sstable.bytesOnDisk();
- for (SSTableReader sstable : newSSTables)
- sizeIncrease += sstable.bytesOnDisk();
-
View currentView, newView;
do
{
@@ -349,9 +344,6 @@ public class DataTracker
}
while (!view.compareAndSet(currentView, newView));
- StorageMetrics.load.inc(sizeIncrease);
- cfstore.metric.liveDiskSpaceUsed.inc(sizeIncrease);
-
for (SSTableReader sstable : newSSTables)
sstable.setTrackedBy(this);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index f87f356..0696fb7 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -258,7 +258,7 @@ public class IndexSummary implements Closeable
}
@Override
- public void close() throws IOException
+ public void close()
{
bytes.free();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index b35f5f4..d5b7364 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -19,17 +19,24 @@ package org.apache.cassandra.io.sstable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
-
-import org.apache.cassandra.config.CFMetaData;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -409,8 +416,9 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
- SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(entry.newSamplingLevel);
- DataTracker tracker = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName()).getDataTracker();
+ ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+ DataTracker tracker = cfs.getDataTracker();
replacedByTracker.put(tracker, sstable);
replacementsByTracker.put(tracker, replacement);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 82a0bc8..d29d5ac 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -17,16 +17,34 @@
*/
package org.apache.cassandra.io.sstable;
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.ICardinality;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
@@ -36,28 +54,62 @@ import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.cache.InstrumentingCache;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedThrottledReader;
import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.metadata.*;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.ICompressedFile;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.ThrottledReader;
import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
@@ -122,15 +174,24 @@ public class SSTableReader extends SSTable implements Closeable
// but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
private final AtomicBoolean isCompacted = new AtomicBoolean(false);
private final AtomicBoolean isSuspect = new AtomicBoolean(false);
- private final AtomicBoolean isReplaced = new AtomicBoolean(false);
- private final SSTableDeletingTask deletingTask;
// not final since we need to be able to change level on a file.
private volatile StatsMetadata sstableMetadata;
private final AtomicLong keyCacheHit = new AtomicLong(0);
private final AtomicLong keyCacheRequest = new AtomicLong(0);
+ /**
+ * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
+ * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
+ * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
+ * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
+ */
+ private Object replaceLock = new Object();
+ private SSTableReader replacedBy;
+ private SSTableReader replaces;
+ private SSTableDeletingTask deletingTask;
+
@VisibleForTesting
public RestorableMeter readMeter;
private ScheduledFuture readMeterSyncFuture;
@@ -275,10 +336,10 @@ public class SSTableReader extends SSTable implements Closeable
statsMetadata);
// special implementation of load to use non-pooled SegmentedFile builders
- SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
+ SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
SegmentedFile.Builder dbuilder = sstable.compression
- ? new CompressedSegmentedFile.Builder()
- : new BufferedSegmentedFile.Builder();
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
if (!sstable.loadSummary(ibuilder, dbuilder))
sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
@@ -480,27 +541,95 @@ public class SSTableReader extends SSTable implements Closeable
return sum;
}
- /**
- * Clean up all opened resources.
- *
- * @throws IOException
- */
- public void close() throws IOException
+ private void tidy(boolean release)
{
if (readMeterSyncFuture != null)
readMeterSyncFuture.cancel(false);
- // if this SSTR was replaced by a new SSTR with a different index summary, the two instances will share
- // resources, so don't force unmapping, clear the FileCacheService entry, or close the BF
- if (!isReplaced.get())
+ assert references.get() == 0;
+
+ synchronized (replaceLock)
{
- // Force finalizing mmapping if necessary
- ifile.cleanup();
- dfile.cleanup();
- // close the BF so it can be opened later.
- bf.close();
+ boolean closeBf = true, closeSummary = true, closeFiles = true;
+
+ if (replacedBy != null)
+ {
+ closeBf = replacedBy.bf != bf;
+ closeSummary = replacedBy.indexSummary != indexSummary;
+ closeFiles = replacedBy.dfile != dfile;
+ }
+
+ if (replaces != null)
+ {
+ closeBf &= replaces.bf != bf;
+ closeSummary &= replaces.indexSummary != indexSummary;
+ closeFiles &= replaces.dfile != dfile;
+ }
+
+ boolean deleteAll = false;
+ if (release && isCompacted.get())
+ {
+ assert replacedBy == null;
+ if (replaces != null)
+ {
+ replaces.replacedBy = null;
+ replaces.deletingTask = deletingTask;
+ replaces.markObsolete();
+ }
+ else
+ {
+ deleteAll = true;
+ }
+ }
+ else
+ {
+ if (replaces != null)
+ replaces.replacedBy = replacedBy;
+ if (replacedBy != null)
+ replacedBy.replaces = replaces;
+ }
+
+ assert references.get() == 0;
+ if (closeBf)
+ bf.close();
+ if (closeSummary)
+ indexSummary.close();
+ if (closeFiles)
+ {
+ ifile.cleanup();
+ dfile.cleanup();
+ }
+ if (deleteAll)
+ {
+ /**
+ * Do the OS a favour and suggest (using fadvice call) that we
+ * don't want to see pages of this SSTable in memory anymore.
+ *
+ * NOTE: We can't use madvice in java because it requires the address of
+ * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
+ */
+ dropPageCache();
+ deletingTask.schedule();
+ }
}
- indexSummary.close();
+ }
+
+ /**
+ * Schedule clean-up of resources
+ */
+ public void close()
+ {
+ tidy(false);
+ }
+
+ public String getFilename()
+ {
+ return dfile.path;
+ }
+
+ public String getIndexFilename()
+ {
+ return ifile.path;
}
public void setTrackedBy(DataTracker tracker)
@@ -726,6 +855,17 @@ public class SSTableReader extends SSTable implements Closeable
}
}
+ public void setReplacedBy(SSTableReader replacement)
+ {
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+ replacedBy = replacement;
+ replacement.replaces = this;
+ replacement.replaceLock = replaceLock;
+ }
+ }
+
/**
* Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
* be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have
@@ -734,49 +874,59 @@ public class SSTableReader extends SSTable implements Closeable
* @return a new SSTableReader
* @throws IOException
*/
- public SSTableReader cloneWithNewSummarySamplingLevel(int samplingLevel) throws IOException
+ public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
{
- int minIndexInterval = metadata.getMinIndexInterval();
- int maxIndexInterval = metadata.getMaxIndexInterval();
- double effectiveInterval = indexSummary.getEffectiveIndexInterval();
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
- IndexSummary newSummary;
+ int minIndexInterval = metadata.getMinIndexInterval();
+ int maxIndexInterval = metadata.getMaxIndexInterval();
+ double effectiveInterval = indexSummary.getEffectiveIndexInterval();
- // We have to rebuild the summary from the on-disk primary index in three cases:
- // 1. The sampling level went up, so we need to read more entries off disk
- // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
- // at full sampling (and consequently at any other sampling level)
- // 3. The max_index_interval was lowered, forcing us to raise the sampling level
- if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
- {
- newSummary = buildSummaryAtLevel(samplingLevel);
- }
- else if (samplingLevel < indexSummary.getSamplingLevel())
- {
- // we can use the existing index summary to make a smaller one
- newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
+ IndexSummary newSummary;
+ long oldSize = bytesOnDisk();
- SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
- SegmentedFile.Builder dbuilder = compression
- ? SegmentedFile.getCompressedBuilder()
- : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- saveSummary(ibuilder, dbuilder, newSummary);
- }
- else
- {
- throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
- "no adjustments to min/max_index_interval");
- }
+ // We have to rebuild the summary from the on-disk primary index in three cases:
+ // 1. The sampling level went up, so we need to read more entries off disk
+ // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
+ // at full sampling (and consequently at any other sampling level)
+ // 3. The max_index_interval was lowered, forcing us to raise the sampling level
+ if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
+ {
+ newSummary = buildSummaryAtLevel(samplingLevel);
+ }
+ else if (samplingLevel < indexSummary.getSamplingLevel())
+ {
+ // we can use the existing index summary to make a smaller one
+ newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
+
+ SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ SegmentedFile.Builder dbuilder = compression
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ saveSummary(ibuilder, dbuilder, newSummary);
+ }
+ else
+ {
+ throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
+ "no adjustments to min/max_index_interval");
+ }
- markReplaced();
- if (readMeterSyncFuture != null)
- readMeterSyncFuture.cancel(false);
+ long newSize = bytesOnDisk();
+ StorageMetrics.load.inc(newSize - oldSize);
+ parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
- SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata);
- replacement.readMeter = this.readMeter;
- replacement.first = this.first;
- replacement.last = this.last;
- return replacement;
+ if (readMeterSyncFuture != null)
+ readMeterSyncFuture.cancel(false);
+
+ SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata);
+ replacement.readMeter = this.readMeter;
+ replacement.first = this.first;
+ replacement.last = this.last;
+ setReplacedBy(replacement);
+ return replacement;
+ }
}
private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
@@ -1342,12 +1492,6 @@ public class SSTableReader extends SSTable implements Closeable
return dfile.onDiskLength;
}
- public void markReplaced()
- {
- boolean success = isReplaced.compareAndSet(false, true);
- assert success : "Attempted to mark an SSTableReader as replaced more than once";
- }
-
public boolean acquireReference()
{
while (true)
@@ -1368,27 +1512,7 @@ public class SSTableReader extends SSTable implements Closeable
public void releaseReference()
{
if (references.decrementAndGet() == 0)
- {
- FileUtils.closeQuietly(this);
-
- // if this SSTR instance was replaced by another with a different index summary, let the new instance
- // handle clearing the page cache and deleting the files
- if (isCompacted.get())
- {
- assert !isReplaced.get();
-
- /**
- * Do the OS a favour and suggest (using fadvice call) that we
- * don't want to see pages of this SSTable in memory anymore.
- *
- * NOTE: We can't use madvice in java because it requires the address of
- * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
- */
- dropPageCache();
-
- deletingTask.schedule();
- }
- }
+ tidy(true);
assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;
}
@@ -1406,6 +1530,10 @@ public class SSTableReader extends SSTable implements Closeable
if (logger.isDebugEnabled())
logger.debug("Marking {} compacted", getFilename());
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+ }
return !isCompacted.getAndSet(true);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 0f5136b..83d8f3a 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.utils;
-import java.io.IOException;
import java.nio.ByteBuffer;
public class AlwaysPresentFilter implements IFilter
@@ -31,7 +30,7 @@ public class AlwaysPresentFilter implements IFilter
public void clear() { }
- public void close() throws IOException { }
+ public void close() { }
public long serializedSize() { return 0; }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 9fbb38e..ceba89b 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.utils;
-import java.io.IOException;
import java.nio.ByteBuffer;
import com.google.common.annotations.VisibleForTesting;
@@ -112,7 +111,7 @@ public abstract class BloomFilter implements IFilter
bitset.clear();
}
- public void close() throws IOException
+ public void close()
{
bitset.close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index 10f6df2..91c0e36 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -29,4 +29,6 @@ public interface IFilter extends Closeable
void clear();
long serializedSize();
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index c6fbddd..96aac6b 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -49,4 +49,6 @@ public interface IBitSet extends Closeable
public long serializedSize(TypeSizes type);
public void clear();
+
+ public void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 29dd848..de8da01 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -139,7 +139,7 @@ public class OffHeapBitSet implements IBitSet
return new OffHeapBitSet(memory);
}
- public void close() throws IOException
+ public void close()
{
bytes.free();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 5657d41..1d2f690 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -387,7 +387,7 @@ public class OpenBitSet implements IBitSet
return (int)((h>>32) ^ h) + 0x98761234;
}
- public void close() throws IOException {
+ public void close() {
// noop, let GC do the cleanup.
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 35fd9bd..9b2b492 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -421,7 +421,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
SSTableReader sstable = original;
for (int samplingLevel = 1; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++)
{
- sstable = sstable.cloneWithNewSummarySamplingLevel(samplingLevel);
+ sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, samplingLevel);
assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel());
int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getMinIndexInterval() * BASE_SAMPLING_LEVEL);
assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index bd50538..8429d37 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -23,17 +23,20 @@ package org.apache.cassandra.io.sstable;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
-import org.junit.Assert;
import com.google.common.collect.Sets;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,27 +44,34 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.LocalToken;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.MmappedSegmentedFile;
import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
+import static org.apache.cassandra.Util.cellname;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
-import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertTrue;
@RunWith(OrderedJUnit4ClassRunner.class)
public class SSTableReaderTest extends SchemaLoader
@@ -400,7 +410,7 @@ public class SSTableReaderTest extends SchemaLoader
}));
}
- SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(1);
+ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement));
for (Future future : futures)
future.get();
[2/2] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ty...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/64bc4584
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/64bc4584
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/64bc4584
Branch: refs/heads/trunk
Commit: 64bc45849fd2a488b766ca9ddfe8456dae50a187
Parents: 0015f37 5ebadc1
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Apr 4 15:37:58 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Apr 4 15:37:58 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 28 +-
.../cassandra/io/sstable/IndexSummary.java | 2 +-
.../io/sstable/IndexSummaryManager.java | 22 +-
.../cassandra/io/sstable/SSTableReader.java | 318 +++++++++++++------
.../cassandra/utils/AlwaysPresentFilter.java | 3 +-
.../org/apache/cassandra/utils/BloomFilter.java | 3 +-
.../org/apache/cassandra/utils/IFilter.java | 2 +
.../org/apache/cassandra/utils/obs/IBitSet.java | 2 +
.../cassandra/utils/obs/OffHeapBitSet.java | 2 +-
.../apache/cassandra/utils/obs/OpenBitSet.java | 2 +-
.../io/sstable/IndexSummaryManagerTest.java | 2 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 28 +-
13 files changed, 278 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/64bc4584/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/64bc4584/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------