You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/07 17:46:12 UTC
[07/12] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/compaction/Scrubber.java
src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c94ef20
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c94ef20
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c94ef20
Branch: refs/heads/trunk
Commit: 4c94ef20d3562ab8f0a922945d78464d6c475d98
Parents: 4de943f 452d6a4
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Tue Jul 7 16:27:58 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Jul 7 16:27:58 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 5 ++
.../cassandra/db/compaction/Scrubber.java | 37 +++++++++++---
.../cassandra/io/sstable/SSTableReader.java | 52 ++++++++++++++------
.../cassandra/tools/StandaloneScrubber.java | 2 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 25 ++++++++++
5 files changed, 97 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 95dc8be,bd1db92..2cbc7c4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,8 +1,23 @@@
-2.0.18
-* Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591)
-
-
-2.0.17
++2.1.9
++Merged from 2.0:
++ * Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591)
++
++
+2.1.8
+ * (cqlsh) Fix bad check for CQL compatibility when DESCRIBE'ing
+ COMPACT STORAGE tables with no clustering columns
+ * Warn when an extra-large partition is compacted (CASSANDRA-9643)
+ * Eliminate strong self-reference chains in sstable ref tidiers (CASSANDRA-9656)
+ * Ensure StreamSession uses canonical sstable reader instances (CASSANDRA-9700)
+ * Ensure memtable book keeping is not corrupted in the event we shrink usage (CASSANDRA-9681)
+ * Update internal python driver for cqlsh (CASSANDRA-9064)
+ * Fix IndexOutOfBoundsException when inserting tuple with too many
+ elements using the string literal notation (CASSANDRA-9559)
+ * Allow JMX over SSL directly from nodetool (CASSANDRA-9090)
+ * Fix incorrect result for IN queries where column not found (CASSANDRA-9540)
+ * Enable describe on indices (CASSANDRA-7814)
+ * ColumnFamilyStore.selectAndReference may block during compaction (CASSANDRA-9637)
+Merged from 2.0:
* Avoid NPE in AuthSuccess#decode (CASSANDRA-9727)
* Add listen_address to system.local (CASSANDRA-9603)
* Bug fixes to resultset metadata construction (CASSANDRA-9636)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index ce98a13,dc60efa..b1c12e0
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -100,8 -95,17 +100,18 @@@ public class Scrubber implements Closea
this.controller = isOffline
? new ScrubController(cfs)
: new CompactionController(cfs, Collections.singleton(sstable), CompactionManager.getDefaultGcBefore(cfs));
- this.isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
+ this.isCommutative = cfs.metadata.isCounter();
- this.expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub)));
+
+ boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists();
+ if (!hasIndexFile)
+ {
+ // if there's any corruption in the -Data.db then rows can't be skipped over. but it's worth a shot.
+ outputHandler.warn("Missing component: " + sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
+ }
+
- this.expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(),
- hasIndexFile ? (int)(SSTableReader.getApproximateKeyCount(toScrub,cfs.metadata)) : 0);
++ this.expectedBloomFilterSize = Math.max(
++ cfs.metadata.getMinIndexInterval(),
++ hasIndexFile ? (int)(SSTableReader.getApproximateKeyCount(toScrub)) : 0);
// loop through each row, deserializing to check for damage.
// we'll also loop through the index at the same time, using the position from the index to recover if the
@@@ -120,14 -128,13 +134,15 @@@
public void scrub()
{
outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
+ Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
+ SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);
try
{
- nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
+ nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null;
+ if (indexAvailable())
{
// throw away variable so we don't have a side effect in the assert
- long firstRowPositionFromIndex = RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position;
+ long firstRowPositionFromIndex = sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position;
assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
}
@@@ -167,13 -179,25 +182,13 @@@
dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
}
- if (!sstable.descriptor.version.hasRowSizeAndColumnCount)
- {
- dataSize = dataSizeFromIndex;
- // avoid an NPE if key is null
- String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.key);
- outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize));
- }
- else
- {
- if (currentIndexKey != null)
- outputHandler.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex));
- }
+ dataSize = dataSizeFromIndex;
+ // avoid an NPE if key is null
+ String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.getKey());
+ outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize));
- assert currentIndexKey != null || indexFile.isEOF();
+ assert currentIndexKey != null || !indexAvailable();
- writer.mark();
try
{
if (key == null)
@@@ -188,11 -212,11 +203,11 @@@
if (dataSize > dataFile.length())
throw new IOError(new IOException("Impossible row size (greater than file length): " + dataSize));
- if (dataStart != dataStartFromIndex)
+ if (indexFile != null && dataStart != dataStartFromIndex)
outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataStartFromIndex));
- if (dataSize != dataSizeFromIndex)
+ if (indexFile != null && dataSize != dataSizeFromIndex)
- outputHandler.warn(String.format("Data file row size %d differs from index file row size %d", dataSize, dataSizeFromIndex));
+ outputHandler.warn(String.format("Data file row size %d different from index file row size %d", dataSize, dataSizeFromIndex));
SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, validateColumns);
if (prevKey != null && prevKey.compareTo(key) > 0)
@@@ -312,10 -331,11 +327,11 @@@
currentRowPositionFromIndex = nextRowPositionFromIndex;
try
{
- nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
- nextRowPositionFromIndex = indexFile.isEOF()
+ nextIndexKey = !indexAvailable() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+
+ nextRowPositionFromIndex = !indexAvailable()
? dataFile.length()
- : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position;
+ : sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position;
}
catch (Throwable th)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 7551d46,8919a09..6879834
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -448,27 -190,9 +448,27 @@@ public class SSTableReader extends SSTa
IPartitioner partitioner,
boolean validate) throws IOException
{
- long start = System.nanoTime();
- SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner, validate);
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
- assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
++ assert !validate || components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+ Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
- EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
++ EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+ ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+ StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, validationMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
SSTableReader sstable = new SSTableReader(descriptor,
components,
metadata,
@@@ -603,48 -374,35 +603,43 @@@
this.dfile = dfile;
this.indexSummary = indexSummary;
this.bf = bloomFilter;
+ this.setup(false);
}
- /**
- * Clean up all opened resources.
- *
- * @throws IOException
- */
- public void close() throws IOException
+ public static long getTotalBytes(Iterable<SSTableReader> sstables)
{
- if (readMeterSyncFuture != null)
- readMeterSyncFuture.cancel(false);
+ long sum = 0;
+ for (SSTableReader sstable : sstables)
+ sum += sstable.onDiskLength();
+ return sum;
+ }
- // Force finalizing mmapping if necessary
+ public static long getTotalUncompressedBytes(Iterable<SSTableReader> sstables)
+ {
+ long sum = 0;
+ for (SSTableReader sstable : sstables)
+ sum += sstable.uncompressedLength();
- if (null != ifile)
- ifile.cleanup();
+ return sum;
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
+ }
- dfile.cleanup();
- // close the BF so it can be opened later.
- if (null != bf)
- bf.close();
+ public int hashCode()
+ {
+ return this.descriptor.hashCode();
+ }
- if (null != indexSummary)
- indexSummary.close();
+ public String getFilename()
+ {
+ return dfile.path;
}
- public String getIndexFilename()
- {
- return ifile.path;
- }
-
- public void setTrackedBy(DataTracker tracker)
+ public void setupKeyCache()
{
- deletingTask.setTracker(tracker);
// under normal operation we can do this at any time, but SSTR is also used outside C* proper,
// e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache
// here when we know we're being wired into the rest of the server infrastructure.
@@@ -659,7 -417,13 +654,13 @@@
load(false, true);
bf = FilterFactory.AlwaysPresent;
}
+ else if (!components.contains(Component.PRIMARY_INDEX))
+ {
+ // avoid any reading of the missing primary index component.
- // this should only happen during StandaloneScrubber
++ // this should only happen for standalone tools
+ load(false, false);
+ }
- else if (!components.contains(Component.FILTER))
+ else if (!components.contains(Component.FILTER) || validation == null)
{
// bf is enabled, but filter component is missing.
load(true, true);
@@@ -708,418 -467,26 +709,427 @@@
? SegmentedFile.getCompressedBuilder()
: SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata);
+ boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
+ boolean builtSummary = false;
if (recreateBloomFilter || !summaryLoaded)
- buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded);
+ {
+ buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
+ builtSummary = true;
+ }
- ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ if (components.contains(Component.PRIMARY_INDEX))
+ ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+
dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
- if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
- saveSummary(this, ibuilder, dbuilder);
+
+ // Check for an index summary that was downsampled even though the serialization format doesn't support
+ // that. If it was downsampled, rebuild it. See CASSANDRA-8993 for details.
- if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel())
++ if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel() && ifile != null)
+ {
+ indexSummary.close();
+ ifile.close();
+ dfile.close();
+
+ logger.info("Detected erroneously downsampled index summary; will rebuild summary at full sampling");
+ FileUtils.deleteWithConfirm(new File(descriptor.filenameFor(Component.SUMMARY)));
+ ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ dbuilder = compression
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
+ ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+ saveSummary(ibuilder, dbuilder);
+ }
+ else if (saveSummaryIfCreated && builtSummary)
+ {
+ saveSummary(ibuilder, dbuilder);
+ }
}
- private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
- {
+ /**
+ * Build index summary(and optionally bloom filter) by reading through Index.db file.
+ *
+ * @param recreateBloomFilter true if recreate bloom filter
+ * @param ibuilder
+ * @param dbuilder
+ * @param summaryLoaded true if index summary is already loaded and not need to build again
+ * @throws IOException
+ */
+ private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
+ {
+ if (!components.contains(Component.PRIMARY_INDEX))
+ return;
+
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
-
+
+ try
+ {
+ long indexSize = primaryIndex.length();
+ long histogramCount = sstableMetadata.estimatedRowSize.count();
+ long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
+ ? histogramCount
+ : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
+
+ try(IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel))
+ {
+
+ if (recreateBloomFilter)
+ bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
+
+ long indexPosition;
+ while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ {
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+ RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex, descriptor.version);
+ DecoratedKey decoratedKey = partitioner.decorateKey(key);
+ if (first == null)
+ first = decoratedKey;
+ last = decoratedKey;
+
+ if (recreateBloomFilter)
+ bf.add(decoratedKey.getKey());
+
+ // if summary was already read from disk we don't want to re-populate it using primary index
+ if (!summaryLoaded)
+ {
+ summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
+ ibuilder.addPotentialBoundary(indexPosition);
+ dbuilder.addPotentialBoundary(indexEntry.position);
+ }
+ }
+
+ if (!summaryLoaded)
+ indexSummary = summaryBuilder.build(partitioner);
+ }
+ }
+ finally
+ {
+ FileUtils.closeQuietly(primaryIndex);
+ }
+
+ first = getMinimalKey(first);
+ last = getMinimalKey(last);
+ }
+
+ /**
+ * Load index summary from Summary.db file if it exists.
+ *
+ * if loaded index summary has different index interval from current value stored in schema,
+ * then Summary.db file will be deleted and this returns false to rebuild summary.
+ *
+ * @param ibuilder
+ * @param dbuilder
+ * @return true if index summary is loaded successfully from Summary.db file.
+ */
+ public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+ if (!summariesFile.exists())
+ return false;
+
+ DataInputStream iStream = null;
+ try
+ {
+ iStream = new DataInputStream(new FileInputStream(summariesFile));
+ indexSummary = IndexSummary.serializer.deserialize(
+ iStream, partitioner, descriptor.version.hasSamplingLevel,
+ metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
+ first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ ibuilder.deserializeBounds(iStream);
+ dbuilder.deserializeBounds(iStream);
+ }
+ catch (IOException e)
+ {
+ if (indexSummary != null)
+ indexSummary.close();
+ logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
+ // corrupted; delete it and fall back to creating a new summary
+ FileUtils.closeQuietly(iStream);
+ // delete it and fall back to creating a new summary
+ FileUtils.deleteWithConfirm(summariesFile);
+ return false;
+ }
+ finally
+ {
+ FileUtils.closeQuietly(iStream);
+ }
+
+ return true;
+ }
+
+ /**
+ * Validates that an index summary has full sampling, as expected when the serialization format does not support
+ * persisting the sampling level.
+ * @return true if the summary has full sampling, false otherwise
+ */
+ private boolean validateSummarySamplingLevel()
+ {
+ // We need to check index summary entries against the index to verify that none of them were dropped due to
+ // downsampling. Downsampling can drop any of the first BASE_SAMPLING_LEVEL entries (repeating that drop pattern
+ // for the remainder of the summary). Unfortunately, the first entry to be dropped is the entry at
+ // index (BASE_SAMPLING_LEVEL - 1), so we need to check a full set of BASE_SAMPLING_LEVEL entries.
++ if (ifile == null)
++ return false;
++
+ Iterator<FileDataInput> segments = ifile.iterator(0);
+ int i = 0;
+ int summaryEntriesChecked = 0;
+ int expectedIndexInterval = getMinIndexInterval();
+ while (segments.hasNext())
+ {
+ FileDataInput in = segments.next();
+ try
+ {
+ while (!in.isEOF())
+ {
+ ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+ if (i % expectedIndexInterval == 0)
+ {
+ ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval));
+ if (!summaryKey.equals(indexKey))
+ return false;
+ summaryEntriesChecked++;
+
+ if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL)
+ return true;
+ }
+ RowIndexEntry.Serializer.skip(in);
+ i++;
+ }
+ }
+ catch (IOException e)
+ {
+ markSuspect();
+ throw new CorruptSSTableException(e, in.getPath());
+ }
+ finally
+ {
+ FileUtils.closeQuietly(in);
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Save index summary to Summary.db file.
+ *
+ * @param ibuilder
+ * @param dbuilder
+ */
+ public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ saveSummary(ibuilder, dbuilder, indexSummary);
+ }
+
+ private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
+ {
+ File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+ if (summariesFile.exists())
+ FileUtils.deleteWithConfirm(summariesFile);
+
+ DataOutputStreamAndChannel oStream = null;
+ try
+ {
+ oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
+ IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel);
+ ByteBufferUtil.writeWithLength(first.getKey(), oStream);
+ ByteBufferUtil.writeWithLength(last.getKey(), oStream);
+ ibuilder.serializeBounds(oStream);
+ dbuilder.serializeBounds(oStream);
+ }
+ catch (IOException e)
+ {
+ logger.debug("Cannot save SSTable Summary: ", e);
+
+ // corrupted hence delete it and let it load it now.
+ if (summariesFile.exists())
+ FileUtils.deleteWithConfirm(summariesFile);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(oStream);
+ }
+ }
+
+ public void setReplacedBy(SSTableReader replacement)
+ {
+ synchronized (tidy.global)
+ {
+ assert replacement != null;
+ assert !tidy.isReplaced;
+ tidy.isReplaced = true;
+ }
+ }
+
+ public boolean isReplaced()
+ {
+ return tidy.isReplaced;
+ }
+
+ /**
+ * Clone this reader with the provided start and open reason, and set the clone as replacement.
+ *
+ * @param newFirst the first key for the replacement (which can be different from the original due to the pre-emptive
+ * opening of compaction results).
+ * @param reason the {@code OpenReason} for the replacement.
+ *
+ * @return the cloned reader. That reader is set as a replacement by the method.
+ */
+ private SSTableReader cloneAndReplace(DecoratedKey newFirst, OpenReason reason)
+ {
+ return cloneAndReplace(newFirst, reason, indexSummary.sharedCopy());
+ }
+
+ /**
+ * Clone this reader with the new values and set the clone as replacement.
+ *
+ * @param newFirst the first key for the replacement (which can be different from the original due to the pre-emptive
+ * opening of compaction results).
+ * @param reason the {@code OpenReason} for the replacement.
+ * @param newSummary the index summary for the replacement.
+ *
+ * @return the cloned reader. That reader is set as a replacement by the method.
+ */
+ private SSTableReader cloneAndReplace(DecoratedKey newFirst, OpenReason reason, IndexSummary newSummary)
+ {
+ SSTableReader replacement = internalOpen(descriptor,
+ components,
+ metadata,
+ partitioner,
- ifile.sharedCopy(),
++ ifile != null ? ifile.sharedCopy() : null,
+ dfile.sharedCopy(),
+ newSummary,
+ bf.sharedCopy(),
+ maxDataAge,
+ sstableMetadata,
+ reason);
+ replacement.first = newFirst;
+ replacement.last = last;
+ replacement.isSuspect.set(isSuspect.get());
+ setReplacedBy(replacement);
+ return replacement;
+ }
+
+ // runOnClose must NOT be an anonymous or non-static inner class, nor must it retain a reference chain to this reader
+ public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
+ {
+ synchronized (tidy.global)
+ {
+ assert openReason != OpenReason.EARLY;
+ // TODO: merge with caller's firstKeyBeyond() work,to save time
+ if (newStart.compareTo(first) > 0)
+ {
+ final long dataStart = getPosition(newStart, Operator.EQ).position;
+ final long indexStart = getIndexScanPosition(newStart);
+ this.tidy.runOnClose = new DropPageCache(dfile, dataStart, ifile, indexStart, runOnClose);
+ }
+
+ return cloneAndReplace(newStart, OpenReason.MOVED_START);
+ }
+ }
+
+ // runOnClose must NOT be an anonymous or non-static inner class, nor must it retain a reference chain to this reader
+ public SSTableReader cloneAsShadowed(final Runnable runOnClose)
+ {
+ synchronized (tidy.global)
+ {
+ assert openReason != OpenReason.EARLY;
+ this.tidy.runOnClose = new DropPageCache(dfile, 0, ifile, 0, runOnClose);
+ return cloneAndReplace(first, OpenReason.SHADOWED);
+ }
+ }
+
+ private static class DropPageCache implements Runnable
+ {
+ final SegmentedFile dfile;
+ final long dfilePosition;
+ final SegmentedFile ifile;
+ final long ifilePosition;
+ final Runnable andThen;
+
+ private DropPageCache(SegmentedFile dfile, long dfilePosition, SegmentedFile ifile, long ifilePosition, Runnable andThen)
+ {
+ this.dfile = dfile;
+ this.dfilePosition = dfilePosition;
+ this.ifile = ifile;
+ this.ifilePosition = ifilePosition;
+ this.andThen = andThen;
+ }
+
+ public void run()
+ {
+ dfile.dropPageCache(dfilePosition);
- ifile.dropPageCache(ifilePosition);
++ if (ifile != null)
++ ifile.dropPageCache(ifilePosition);
+ andThen.run();
+ }
+ }
+
+ /**
+ * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
+ * be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have
+ * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
+ * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
+ * @return a new SSTableReader
+ * @throws IOException
+ */
+ public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
+ {
+ assert descriptor.version.hasSamplingLevel;
+
+ synchronized (tidy.global)
+ {
+ assert openReason != OpenReason.EARLY;
+
+ int minIndexInterval = metadata.getMinIndexInterval();
+ int maxIndexInterval = metadata.getMaxIndexInterval();
+ double effectiveInterval = indexSummary.getEffectiveIndexInterval();
+
+ IndexSummary newSummary;
+ long oldSize = bytesOnDisk();
+
+ // We have to rebuild the summary from the on-disk primary index in three cases:
+ // 1. The sampling level went up, so we need to read more entries off disk
+ // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
+ // at full sampling (and consequently at any other sampling level)
+ // 3. The max_index_interval was lowered, forcing us to raise the sampling level
+ if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
+ {
+ newSummary = buildSummaryAtLevel(samplingLevel);
+ }
+ else if (samplingLevel < indexSummary.getSamplingLevel())
+ {
+ // we can use the existing index summary to make a smaller one
+ newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
+
+ SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ SegmentedFile.Builder dbuilder = compression
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ saveSummary(ibuilder, dbuilder, newSummary);
+ }
+ else
+ {
+ throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
+ "no adjustments to min/max_index_interval");
+ }
+
+ long newSize = bytesOnDisk();
+ StorageMetrics.load.inc(newSize - oldSize);
+ parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
+
+ return cloneAndReplace(first, OpenReason.METADATA_CHANGE, newSummary);
+ }
+ }
+
+ private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
+ {
+ // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+ RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
try
{
long indexSize = primaryIndex.length();
@@@ -1566,16 -974,22 +1576,19 @@@
{
if (op == Operator.EQ && updateCacheAndStats)
bloomFilterTracker.addFalsePositive();
- // we matched the -1th position: if the operator might match forward, we'll start at the first
- // position. We however need to return the correct index entry for that first position.
- if (op.apply(1) >= 0)
- {
- sampledPosition = 0;
- }
- else
- {
- Tracing.trace("Partition summary allows skipping sstable {}", descriptor.generation);
- return null;
- }
+ Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation);
+ return null;
}
+ int binarySearchResult = indexSummary.binarySearch(key);
+ long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary);
+ int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
+
+ int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
+
+ if (ifile == null)
+ return null;
+
// scan the on-disk index, starting at the nearest sampled position.
// The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present
// (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the
@@@ -1670,11 -1084,13 +1683,14 @@@
*/
public DecoratedKey firstKeyBeyond(RowPosition token)
{
+ if (token.compareTo(first) < 0)
+ return first;
+
long sampledPosition = getIndexScanPosition(token);
- if (sampledPosition == -1)
- sampledPosition = 0;
+ if (ifile == null)
+ return null;
+
Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
while (segments.hasNext())
{
@@@ -1948,8 -1391,7 +1964,10 @@@
{
try
{
- return SSTableMetadata.serializer.deserialize(descriptor).right;
+ CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
- return compactionMetadata.ancestors;
++ if (compactionMetadata != null)
++ return compactionMetadata.ancestors;
++ return Collections.emptySet();
}
catch (IOException e)
{
@@@ -1995,7 -1441,7 +2013,9 @@@
public RandomAccessReader openIndexReader()
{
- return ifile.createReader();
- return RandomAccessReader.open(new File(getIndexFilename()));
++ if (ifile != null)
++ return ifile.createReader();
++ return null;
}
/**
@@@ -2024,152 -1470,73 +2044,154 @@@
}
/**
- * @param sstables
- * @return true if all desired references were acquired. Otherwise, it will unreference any partial acquisition, and return false.
+ * Increment the total row read count and read rate for this SSTable. This should not be incremented for range
+ * slice queries, row cache hits, or non-query reads, like compaction.
*/
- public static boolean acquireReferences(Iterable<SSTableReader> sstables)
+ public void incrementReadCount()
{
- SSTableReader failed = null;
- for (SSTableReader sstable : sstables)
+ if (readMeter != null)
+ readMeter.mark();
+ }
+
+ public static class SizeComparator implements Comparator<SSTableReader>
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
{
- if (!sstable.acquireReference())
- {
- failed = sstable;
- break;
- }
+ return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
}
+ }
- if (failed == null)
- return true;
+ public Ref<SSTableReader> tryRef()
+ {
+ return selfRef.tryRef();
+ }
- for (SSTableReader sstable : sstables)
- {
- if (sstable == failed)
- break;
- sstable.releaseReference();
- }
- return false;
+ public Ref<SSTableReader> selfRef()
+ {
+ return selfRef;
}
- public static void releaseReferences(Iterable<SSTableReader> sstables)
+ public Ref<SSTableReader> ref()
{
- for (SSTableReader sstable : sstables)
- {
- sstable.releaseReference();
- }
+ return selfRef.ref();
}
- private void dropPageCache()
+ void setup(boolean isOffline)
{
- dropPageCache(dfile.path);
- if (null != ifile)
- dropPageCache(ifile.path);
+ tidy.setup(this, isOffline);
+ this.readMeter = tidy.global.readMeter;
}
- private void dropPageCache(String filePath)
+ @VisibleForTesting
+ public void overrideReadMeter(RestorableMeter readMeter)
{
- RandomAccessFile file = null;
+ this.readMeter = tidy.global.readMeter = readMeter;
+ }
- try
+ /**
+ * One instance per SSTableReader we create. This references the type-shared tidy, which in turn references
+ * the globally shared tidy, i.e.
+ *
+ * InstanceTidier => DescriptorTypeTitdy => GlobalTidy
+ *
+ * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example), but there can only be
+ * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy for one single logical sstable.
+ *
+ * When the InstanceTidier cleansup, it releases its reference to its DescriptorTypeTidy; when all InstanceTidiers
+ * for that type have run, the DescriptorTypeTidy cleansup. DescriptorTypeTidy behaves in the same way towards GlobalTidy.
+ *
+ * For ease, we stash a direct reference to both our type-shared and global tidier
+ */
+ private static final class InstanceTidier implements Tidy
+ {
+ private final Descriptor descriptor;
+ private final CFMetaData metadata;
+ private IFilter bf;
+ private IndexSummary summary;
+
+ private SegmentedFile dfile;
+ private SegmentedFile ifile;
+ private Runnable runOnClose;
+ private boolean isReplaced = false;
+
+ // a reference to our shared per-Descriptor.Type tidy instance, that
+ // we will release when we are ourselves released
+ private Ref<DescriptorTypeTidy> typeRef;
+
+ // a convenience stashing of the shared per-descriptor-type tidy instance itself
+ // and the per-logical-sstable globally shared state that it is linked to
+ private DescriptorTypeTidy type;
+ private GlobalTidy global;
+
+ private boolean setup;
+
+ void setup(SSTableReader reader, boolean isOffline)
+ {
+ this.setup = true;
+ this.bf = reader.bf;
+ this.summary = reader.indexSummary;
+ this.dfile = reader.dfile;
+ this.ifile = reader.ifile;
+ // get a new reference to the shared descriptor-type tidy
+ this.typeRef = DescriptorTypeTidy.get(reader);
+ this.type = typeRef.get();
+ this.global = type.globalRef.get();
+ if (!isOffline)
+ global.ensureReadMeter();
+ }
+
+ InstanceTidier(Descriptor descriptor, CFMetaData metadata)
{
- file = new RandomAccessFile(filePath, "r");
+ this.descriptor = descriptor;
+ this.metadata = metadata;
+ }
- int fd = CLibrary.getfd(file.getFD());
+ public void tidy()
+ {
+ // don't try to cleanup if the sstablereader was never fully constructed
+ if (!setup)
+ return;
- if (fd > 0)
+ final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+ final OpOrder.Barrier barrier;
+ if (cfs != null)
{
- if (logger.isDebugEnabled())
- logger.debug(String.format("Dropping page cache of file %s.", filePath));
-
- CLibrary.trySkipCache(fd, 0, 0);
+ barrier = cfs.readOrdering.newBarrier();
+ barrier.issue();
}
+ else
+ barrier = null;
+
+ ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
+ {
+ public void run()
+ {
+ if (barrier != null)
+ barrier.await();
- bf.close();
++ if (bf != null)
++ bf.close();
+ if (summary != null)
+ summary.close();
+ if (runOnClose != null)
+ runOnClose.run();
+ dfile.close();
- ifile.close();
++ if (ifile != null)
++ ifile.close();
+ typeRef.release();
+ }
+ });
}
- catch (IOException e)
+
+ public String name()
{
- // we don't care if cache cleanup fails
+ return descriptor.toString();
}
- finally
+
+ void releaseSummary()
{
- FileUtils.closeQuietly(file);
+ summary.close();
+ assert summary.isCleanedUp();
+ summary = null;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------