You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/19 15:31:24 UTC
[1/3] cassandra git commit: Fix regression in SSTableRewriter causing
some rows to become unreadable during compaction
Repository: cassandra
Updated Branches:
refs/heads/trunk 7783f130c -> 9a51c3c3a
Fix regression in SSTableRewriter causing some rows to become unreadable during compaction
patch by marcus and benedict for CASSANDRA-8429
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/871f0039
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/871f0039
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/871f0039
Branch: refs/heads/trunk
Commit: 871f0039c5bf89be343039478c64ce835b04b5cf
Parents: bedd97f
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 19 14:04:38 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 19 14:24:47 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../db/compaction/CompactionManager.java | 3 -
.../io/compress/CompressedSequentialWriter.java | 12 +-
.../io/compress/CompressionMetadata.java | 31 +--
.../cassandra/io/sstable/SSTableReader.java | 26 +--
.../cassandra/io/sstable/SSTableRewriter.java | 189 +++++++++----------
.../cassandra/io/sstable/SSTableWriter.java | 118 +++++++-----
.../io/util/BufferedPoolingSegmentedFile.java | 9 +-
.../io/util/BufferedSegmentedFile.java | 9 +-
.../io/util/CompressedPoolingSegmentedFile.java | 11 +-
.../io/util/CompressedSegmentedFile.java | 16 +-
.../cassandra/io/util/MmappedSegmentedFile.java | 8 +-
.../apache/cassandra/io/util/SegmentedFile.java | 12 +-
.../io/sstable/SSTableRewriterTest.java | 71 ++++++-
14 files changed, 278 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e5a8f05..ac28d78 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.1.3
+ * Fix regression in SSTableRewriter causing some rows to become unreadable
+ during compaction (CASSANDRA-8429)
* Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
* (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
is disabled (CASSANDRA-8288)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9f5951c..872ebed 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1035,9 +1035,6 @@ public class CompactionManager implements CompactionManagerMBean
unrepairedKeyCount++;
}
}
- // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
- // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
- // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index e533b1e..d3c41fa 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.FileMark;
@@ -139,15 +140,10 @@ public class CompressedSequentialWriter extends SequentialWriter
chunkOffset += compressedLength + 4;
}
- public CompressionMetadata openEarly()
+ public CompressionMetadata open(SSTableWriter.FinishType finishType)
{
- return metadataWriter.openEarly(originalSize, chunkOffset);
- }
-
- public CompressionMetadata openAfterClose()
- {
- assert current == originalSize;
- return metadataWriter.openAfterClose(current, chunkOffset);
+ assert finishType != SSTableWriter.FinishType.NORMAL || current == originalSize;
+ return metadataWriter.open(originalSize, chunkOffset, finishType);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index c922963..173722f 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -47,10 +47,10 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
/**
@@ -217,7 +217,7 @@ public class CompressionMetadata
throw new CorruptSSTableException(new EOFException(), indexFilePath);
long chunkOffset = chunkOffsets.getLong(idx);
- long nextChunkOffset = (idx + 8 == chunkOffsets.size())
+ long nextChunkOffset = (idx + 8 == chunkOffsetsSize)
? compressedFileLength
: chunkOffsets.getLong(idx + 8);
@@ -319,18 +319,25 @@ public class CompressionMetadata
}
}
- public CompressionMetadata openEarly(long dataLength, long compressedLength)
+ public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType finishType)
{
+ RefCountedMemory offsets;
+ switch (finishType)
+ {
+ case EARLY:
+ offsets = this.offsets;
+ break;
+ case NORMAL:
+ case FINISH_EARLY:
+ offsets = this.offsets.copy(count * 8L);
+ this.offsets.unreference();
+ break;
+ default:
+ throw new AssertionError();
+ }
return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
}
- public CompressionMetadata openAfterClose(long dataLength, long compressedLength)
- {
- RefCountedMemory newOffsets = offsets.copy(count * 8L);
- offsets.unreference();
- return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
- }
-
/**
* Get a chunk offset by it's index.
*
@@ -360,8 +367,8 @@ public class CompressionMetadata
out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
assert chunks == count;
writeHeader(out, dataLength, chunks);
- for (int i = 0 ; i < count ; i++)
- out.writeLong(offsets.getLong(i * 8));
+ for (int i = 0 ; i < count ; i++)
+ out.writeLong(offsets.getLong(i * 8));
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index bd20226..217a109 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -201,7 +201,6 @@ public class SSTableReader extends SSTable
private Object replaceLock = new Object();
private SSTableReader replacedBy;
private SSTableReader replaces;
- private SSTableReader sharesBfWith;
private SSTableDeletingTask deletingTask;
private Runnable runOnClose;
@@ -575,7 +574,7 @@ public class SSTableReader extends SSTable
synchronized (replaceLock)
{
- boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
+ boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
if (replacedBy != null)
{
@@ -594,19 +593,11 @@ public class SSTableReader extends SSTable
deleteFiles &= !dfile.path.equals(replaces.dfile.path);
}
- if (sharesBfWith != null)
- {
- closeBf &= sharesBfWith.bf != bf;
- closeSummary &= sharesBfWith.indexSummary != indexSummary;
- closeFiles &= sharesBfWith.dfile != dfile;
- deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path);
- }
-
boolean deleteAll = false;
if (release && isCompacted.get())
{
assert replacedBy == null;
- if (replaces != null)
+ if (replaces != null && !deleteFiles)
{
replaces.replacedBy = null;
replaces.deletingTask = deletingTask;
@@ -936,19 +927,6 @@ public class SSTableReader extends SSTable
}
}
- /**
- * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter
- *
- * note that the reason we don't use replacedBy is that we are not yet actually replaced
- *
- * @param newReader
- */
- public void sharesBfWith(SSTableReader newReader)
- {
- assert openReason.equals(OpenReason.EARLY);
- this.sharesBfWith = newReader;
- }
-
public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
{
synchronized (replaceLock)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index cd9435d..43ac4b6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -17,18 +17,11 @@
*/
package org.apache.cassandra.io.sstable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
-import com.google.common.collect.Lists;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -38,7 +31,6 @@ import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
/**
* Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
@@ -84,8 +76,10 @@ public class SSTableRewriter
private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
- private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables
- private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>();
+ private final Queue<Finished> finishedEarly = new ArrayDeque<>();
+ // as writers are closed from finishedEarly, their last readers are moved
+ // into discard, so that abort can cleanup after us safely
+ private final List<SSTableReader> discard = new ArrayList<>();
private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
private SSTableWriter writer;
@@ -183,42 +177,30 @@ public class SSTableRewriter
public void abort()
{
switchWriter(null);
-
moveStarts(null, Functions.forMap(originalStarts), true);
- List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly);
-
- for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
- {
- // we should close the bloom filter if we have not opened an sstable reader from this
- // writer (it will get closed when we release the sstable reference below):
- w.left.abort(w.right == null);
- if (isOffline && w.right != null)
- {
- // the pairs get removed from finishedWriters when they are closedAndOpened in finish(), the ones left need to be removed here:
- w.right.markObsolete();
- w.right.releaseReference();
- }
- }
-
- // also remove already completed SSTables
- for (SSTableReader sstable : close)
- sstable.markObsolete();
-
+ // remove already completed SSTables
for (SSTableReader sstable : finished)
{
sstable.markObsolete();
sstable.releaseReference();
}
- // releases reference in replaceReaders
- if (!isOffline)
+ // abort the writers
+ for (Finished finished : finishedEarly)
{
- dataTracker.replaceEarlyOpenedFiles(close, Collections.<SSTableReader>emptyList());
- dataTracker.unmarkCompacting(close);
+ boolean opened = finished.reader != null;
+ finished.writer.abort(!opened);
+ if (opened)
+ {
+ // if we've already been opened, add ourselves to the discard pile
+ discard.add(finished.reader);
+ finished.reader.markObsolete();
+ }
}
- }
+ replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
+ }
/**
* Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer
@@ -274,8 +256,6 @@ public class SSTableRewriter
rewriting.addAll(replaceWith);
}
-
-
private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
{
if (isOffline)
@@ -301,15 +281,19 @@ public class SSTableRewriter
writer = newWriter;
return;
}
- // we leave it as a tmp file, but we open it early and add it to the dataTracker
- SSTableReader reader = writer.openEarly(maxAge);
- if (reader != null)
+
+ // we leave it as a tmp file, but we open it and add it to the dataTracker
+ if (writer.getFilePointer() != 0)
{
- finishedOpenedEarly.add(reader);
+ SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
moveStarts(reader, Functions.constant(reader.last), false);
+ finishedEarly.add(new Finished(writer, reader));
+ }
+ else
+ {
+ writer.abort();
}
- finishedWriters.add(Pair.create(writer, reader));
currentlyOpenedEarly = null;
currentlyOpenedEarlyAt = 0;
writer = newWriter;
@@ -337,85 +321,82 @@ public class SSTableRewriter
*/
public List<SSTableReader> finish(long repairedAt)
{
- List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
+ return finishAndMaybeThrow(repairedAt, false, false);
+ }
+
+ @VisibleForTesting
+ void finishAndThrow(boolean throwEarly)
+ {
+ finishAndMaybeThrow(-1, throwEarly, !throwEarly);
+ }
+
+ private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate)
+ {
+ List<SSTableReader> newReaders = new ArrayList<>();
switchWriter(null);
- // make real sstables of the written ones:
- Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
- while(it.hasNext())
+
+ if (throwEarly)
+ throw new RuntimeException("exception thrown early in finish, for testing");
+
+ while (!finishedEarly.isEmpty())
{
- Pair<SSTableWriter, SSTableReader> w = it.next();
- if (w.left.getFilePointer() > 0)
+ Finished f = finishedEarly.poll();
+ if (f.writer.getFilePointer() > 0)
{
- SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
- finished.add(newReader);
+ if (f.reader != null)
+ discard.add(f.reader);
- if (w.right != null)
- {
- w.right.sharesBfWith(newReader);
- if (isOffline)
- {
- // remove the tmplink files if we are offline - no one is using them
- w.right.markObsolete();
- w.right.releaseReference();
- }
- }
- // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
- toReplace.add(Pair.create(w.right, newReader));
+ SSTableReader newReader = f.writer.finish(SSTableWriter.FinishType.FINISH_EARLY, maxAge, repairedAt);
+
+ if (f.reader != null)
+ f.reader.setReplacedBy(newReader);
+
+ finished.add(newReader);
+ newReaders.add(newReader);
}
else
{
- assert w.right == null;
- w.left.abort(true);
+ f.writer.abort(true);
+ assert f.reader == null;
}
- it.remove();
}
- if (!isOffline)
- {
- for (Pair<SSTableReader, SSTableReader> replace : toReplace)
- replaceEarlyOpenedFile(replace.left, replace.right);
- dataTracker.unmarkCompacting(finished);
- }
+ if (throwLate)
+ throw new RuntimeException("exception thrown after all sstables finished, for testing");
+
+ replaceWithFinishedReaders(newReaders);
return finished;
}
- @VisibleForTesting
- void finishAndThrow(boolean early)
+ // cleanup all our temporary readers and swap in our new ones
+ private void replaceWithFinishedReaders(List<SSTableReader> finished)
{
- List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
- switchWriter(null);
- if (early)
- throw new RuntimeException("exception thrown early in finish");
- // make real sstables of the written ones:
- Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
- while(it.hasNext())
+ if (isOffline)
{
- Pair<SSTableWriter, SSTableReader> w = it.next();
- if (w.left.getFilePointer() > 0)
- {
- SSTableReader newReader = w.left.closeAndOpenReader(maxAge);
- finished.add(newReader);
-
- if (w.right != null)
- {
- w.right.sharesBfWith(newReader);
- if (isOffline)
- {
- w.right.markObsolete();
- w.right.releaseReference();
- }
- }
- // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
- toReplace.add(Pair.create(w.right, newReader));
- }
- else
+ for (SSTableReader reader : discard)
{
- assert w.right == null;
- w.left.abort(true);
+ if (reader.getCurrentReplacement() == null)
+ reader.markObsolete();
+ reader.releaseReference();
}
- it.remove();
}
+ else
+ {
+ dataTracker.replaceEarlyOpenedFiles(discard, finished);
+ dataTracker.unmarkCompacting(discard);
+ }
+ discard.clear();
+ }
- throw new RuntimeException("exception thrown after all sstables finished");
+ private static final class Finished
+ {
+ final SSTableWriter writer;
+ final SSTableReader reader;
+
+ private Finished(SSTableWriter writer, SSTableReader reader)
+ {
+ this.writer = writer;
+ this.reader = reader;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ec64561..595012d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -380,6 +380,18 @@ public class SSTableWriter extends SSTable
last = lastWrittenKey = getMinimalKey(last);
}
+ private Descriptor makeTmpLinks()
+ {
+ // create temp links if they don't already exist
+ Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+ if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+ {
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
+ }
+ return link;
+ }
+
public SSTableReader openEarly(long maxDataAge)
{
StatsMetadata sstableMetadata = (StatsMetadata) sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
@@ -391,17 +403,10 @@ public class SSTableWriter extends SSTable
if (exclusiveUpperBoundOfReadableIndex == null)
return null;
- // create temp links if they don't already exist
- Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
- if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
- {
- FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
- FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
- }
-
+ Descriptor link = makeTmpLinks();
// open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
- SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
- SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
+ SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
+ SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
components, metadata,
partitioner, ifile,
@@ -435,6 +440,19 @@ public class SSTableWriter extends SSTable
return sstable;
}
+ public static enum FinishType
+ {
+ NORMAL(SSTableReader.OpenReason.NORMAL),
+ EARLY(SSTableReader.OpenReason.EARLY), // no renaming
+ FINISH_EARLY(SSTableReader.OpenReason.NORMAL); // tidy up an EARLY finish
+ final SSTableReader.OpenReason openReason;
+
+ FinishType(SSTableReader.OpenReason openReason)
+ {
+ this.openReason = openReason;
+ }
+ }
+
public SSTableReader closeAndOpenReader()
{
return closeAndOpenReader(System.currentTimeMillis());
@@ -442,68 +460,84 @@ public class SSTableWriter extends SSTable
public SSTableReader closeAndOpenReader(long maxDataAge)
{
- return closeAndOpenReader(maxDataAge, this.repairedAt);
+ return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
}
- public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt)
+ public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
{
- Pair<Descriptor, StatsMetadata> p = close(repairedAt);
- Descriptor newdesc = p.left;
- StatsMetadata sstableMetadata = p.right;
+ Pair<Descriptor, StatsMetadata> p;
+
+ p = close(finishType, repairedAt);
+ Descriptor desc = p.left;
+ StatsMetadata metadata = p.right;
+
+ if (finishType == FinishType.EARLY)
+ desc = makeTmpLinks();
// finalize in-memory state for the reader
- SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
- SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA));
- SSTableReader sstable = SSTableReader.internalOpen(newdesc,
+ SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
+ SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
+ SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
components,
- metadata,
+ this.metadata,
partitioner,
ifile,
dfile,
iwriter.summary.build(partitioner),
iwriter.bf,
maxDataAge,
- sstableMetadata,
- SSTableReader.OpenReason.NORMAL);
+ metadata,
+ finishType.openReason);
sstable.first = getMinimalKey(first);
sstable.last = getMinimalKey(last);
- // try to save the summaries to disk
- sstable.saveSummary(iwriter.builder, dbuilder);
- iwriter = null;
- dbuilder = null;
+
+ switch (finishType)
+ {
+ case NORMAL: case FINISH_EARLY:
+ // try to save the summaries to disk
+ sstable.saveSummary(iwriter.builder, dbuilder);
+ iwriter = null;
+ dbuilder = null;
+ }
return sstable;
}
// Close the writer and return the descriptor to the new sstable and it's metadata
public Pair<Descriptor, StatsMetadata> close()
{
- return close(this.repairedAt);
+ return close(FinishType.NORMAL, this.repairedAt);
}
- private Pair<Descriptor, StatsMetadata> close(long repairedAt)
+ private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
{
+ switch (type)
+ {
+ case EARLY: case NORMAL:
+ iwriter.close();
+ dataFile.close();
+ }
- // index and filter
- iwriter.close();
- // main data, close will truncate if necessary
- dataFile.close();
- dataFile.writeFullChecksum(descriptor);
// write sstable statistics
- Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata(
- partitioner.getClass().getCanonicalName(),
- metadata.getBloomFilterFpChance(),
- repairedAt);
- writeMetadata(descriptor, metadataComponents);
-
- // save the table of components
- SSTable.appendTOC(descriptor, components);
+ Map<MetadataType, MetadataComponent> metadataComponents ;
+ metadataComponents = sstableMetadataCollector
+ .finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance(),repairedAt);
// remove the 'tmp' marker from all components
- return Pair.create(rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS));
+ Descriptor descriptor = this.descriptor;
+ switch (type)
+ {
+ case NORMAL: case FINISH_EARLY:
+ dataFile.writeFullChecksum(descriptor);
+ writeMetadata(descriptor, metadataComponents);
+ // save the table of components
+ SSTable.appendTOC(descriptor, components);
+ descriptor = rename(descriptor, components);
+ }
+ return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
}
-
private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
{
SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index b284f61..57f465f 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.util;
import java.io.File;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
{
public BufferedPoolingSegmentedFile(String path, long length)
@@ -33,16 +35,11 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
// only one segment in a standard-io file
}
- public SegmentedFile complete(String path)
+ public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
{
long length = new File(path).length();
return new BufferedPoolingSegmentedFile(path, length);
}
-
- public SegmentedFile openEarly(String path)
- {
- return complete(path);
- }
}
protected RandomAccessReader createReader(String path)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index aa031e3..2f715da 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.util;
import java.io.File;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
public class BufferedSegmentedFile extends SegmentedFile
{
public BufferedSegmentedFile(String path, long length)
@@ -33,16 +35,11 @@ public class BufferedSegmentedFile extends SegmentedFile
// only one segment in a standard-io file
}
- public SegmentedFile complete(String path)
+ public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
{
long length = new File(path).length();
return new BufferedSegmentedFile(path, length);
}
-
- public SegmentedFile openEarly(String path)
- {
- return complete(path);
- }
}
public FileDataInput getSegment(long position)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 1803e69..11d091a 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.util;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableWriter;
public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
{
@@ -43,17 +44,11 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
// only one segment in a standard-io file
}
- public SegmentedFile complete(String path)
+ public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
{
- return new CompressedPoolingSegmentedFile(path, metadata(path, false));
- }
-
- public SegmentedFile openEarly(String path)
- {
- return new CompressedPoolingSegmentedFile(path, metadata(path, true));
+ return new CompressedPoolingSegmentedFile(path, metadata(path, finishType));
}
}
-
protected RandomAccessReader createReader(String path)
{
return CompressedRandomAccessReader.open(path, metadata, this);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 4afe0a0..b788715 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.util;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableWriter;
public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
{
@@ -44,24 +45,17 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
// only one segment in a standard-io file
}
- protected CompressionMetadata metadata(String path, boolean early)
+ protected CompressionMetadata metadata(String path, SSTableWriter.FinishType finishType)
{
if (writer == null)
return CompressionMetadata.create(path);
- else if (early)
- return writer.openEarly();
- else
- return writer.openAfterClose();
- }
- public SegmentedFile complete(String path)
- {
- return new CompressedSegmentedFile(path, metadata(path, false));
+ return writer.open(finishType);
}
- public SegmentedFile openEarly(String path)
+ public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
{
- return new CompressedSegmentedFile(path, metadata(path, true));
+ return new CompressedSegmentedFile(path, metadata(path, finishType));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index ccc03fc..3b2cc98 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
public class MmappedSegmentedFile extends SegmentedFile
@@ -160,18 +161,13 @@ public class MmappedSegmentedFile extends SegmentedFile
}
}
- public SegmentedFile complete(String path)
+ public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
{
long length = new File(path).length();
// create the segments
return new MmappedSegmentedFile(path, length, createSegments(path));
}
- public SegmentedFile openEarly(String path)
- {
- return complete(path);
- }
-
private Segment[] createSegments(String path)
{
RandomAccessFile raf;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index be549a6..badae56 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.Pair;
/**
@@ -115,13 +116,12 @@ public abstract class SegmentedFile
* Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
* @param path The file on disk.
*/
- public abstract SegmentedFile complete(String path);
+ public abstract SegmentedFile complete(String path, SSTableWriter.FinishType openType);
- /**
- * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
- * @param path The file on disk.
- */
- public abstract SegmentedFile openEarly(String path);
+ public SegmentedFile complete(String path)
+ {
+ return complete(path, SSTableWriter.FinishType.NORMAL);
+ }
public void serializeBounds(DataOutput out) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 6f9acea..392936d 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -483,7 +483,6 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.disableAutoCompaction();
SSTableReader s = writeFile(cfs, 400);
- DecoratedKey origFirst = s.first;
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(1000000);
@@ -499,8 +498,7 @@ public class SSTableRewriterTest extends SchemaLoader
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
{
- assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
- assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
+ assertEquals(files, cfs.getSSTables().size()); // all files are now opened early
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
files++;
}
@@ -616,6 +614,73 @@ public class SSTableRewriterTest extends SchemaLoader
}
+ @Test
+ public void testAllKeysReadable() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ for (int i = 0; i < 100; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i));
+ Mutation rm = new Mutation(KEYSPACE, key.getKey());
+ for (int j = 0; j < 10; j++)
+ rm.add(CF, Util.cellname(Integer.toString(j)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ cfs.forceMajorCompaction();
+ validateKeys(keyspace);
+
+ assertEquals(1, cfs.getSSTables().size());
+ SSTableReader s = cfs.getSSTables().iterator().next();
+ Set<SSTableReader> compacting = new HashSet<>();
+ compacting.add(s);
+ cfs.getDataTracker().markCompacting(compacting);
+
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter.overrideOpenInterval(1);
+ SSTableWriter w = getWriter(cfs, s.descriptor.directory);
+ rewriter.switchWriter(w);
+ int keyCount = 0;
+ try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ while (scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (keyCount % 10 == 0)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ }
+ keyCount++;
+ validateKeys(keyspace);
+ }
+ try
+ {
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(), OperationType.COMPACTION);
+ cfs.getDataTracker().unmarkCompacting(compacting);
+ }
+ catch (Throwable t)
+ {
+ rewriter.abort();
+ }
+ }
+ validateKeys(keyspace);
+ Thread.sleep(1000);
+ validateCFS(cfs);
+ }
+
+ private void validateKeys(Keyspace ks)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i));
+ ColumnFamily cf = Util.getColumnFamily(ks, key, CF);
+ assertTrue(cf != null);
+ }
+ }
+
private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
{
ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionManager.java
src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a51c3c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a51c3c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a51c3c3
Branch: refs/heads/trunk
Commit: 9a51c3c3a891f2a22189b53db48b767b9a0400ca
Parents: 7783f13 871f003
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 19 14:31:06 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 19 14:31:06 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../io/compress/CompressedSequentialWriter.java | 12 +-
.../io/compress/CompressionMetadata.java | 30 +--
.../cassandra/io/sstable/SSTableRewriter.java | 189 +++++++++----------
.../io/sstable/format/SSTableReader.java | 26 +--
.../io/sstable/format/SSTableWriter.java | 17 +-
.../io/sstable/format/big/BigTableWriter.java | 113 +++++++----
.../io/util/BufferedPoolingSegmentedFile.java | 9 +-
.../io/util/BufferedSegmentedFile.java | 9 +-
.../io/util/CompressedPoolingSegmentedFile.java | 11 +-
.../io/util/CompressedSegmentedFile.java | 16 +-
.../cassandra/io/util/MmappedSegmentedFile.java | 8 +-
.../apache/cassandra/io/util/SegmentedFile.java | 12 +-
.../io/sstable/SSTableRewriterTest.java | 77 +++++++-
14 files changed, 290 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5ec1bd2,ac28d78..c91f202
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,47 -1,6 +1,49 @@@
+3.0
+ * Support index key/value entries on map collections (CASSANDRA-8473)
+ * Modernize schema tables (CASSANDRA-8261)
+ * Support for user-defined aggregation functions (CASSANDRA-8053)
+ * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
+ * Refactor SelectStatement, return IN results in natural order instead
+ of IN value list order (CASSANDRA-7981)
+ * Support UDTs, tuples, and collections in user-defined
+ functions (CASSANDRA-7563)
+ * Fix aggregate fn results on empty selection, result column name,
+ and cqlsh parsing (CASSANDRA-8229)
+ * Mark sstables as repaired after full repair (CASSANDRA-7586)
+ * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
+ * Integrate JMH for microbenchmarks (CASSANDRA-8151)
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
+ * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
+ * Support for aggregation functions (CASSANDRA-4914)
+ * Remove cassandra-cli (CASSANDRA-7920)
+ * Accept dollar quoted strings in CQL (CASSANDRA-7769)
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063, 7813)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+ * Fail on very large batch sizes (CASSANDRA-8011)
+ * Improve concurrency of repair (CASSANDRA-6455, 8208)
+
+
2.1.3
+ * Fix regression in SSTableRewriter causing some rows to become unreadable
+ during compaction (CASSANDRA-8429)
* Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
* (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
is disabled (CASSANDRA-8288)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index e533b1e,d3c41fa..e875ee3
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@@ -27,6 -27,7 +27,7 @@@ import org.apache.cassandra.io.FSReadEr
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.FileMark;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index dfbc50f,173722f..221b3c1
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@@ -48,7 -47,7 +48,8 @@@ import org.apache.cassandra.io.IVersion
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.Memory;
@@@ -322,18 -319,25 +323,25 @@@ public class CompressionMetadat
}
}
- public CompressionMetadata openEarly(long dataLength, long compressedLength)
+ public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType finishType)
{
+ RefCountedMemory offsets;
+ switch (finishType)
+ {
+ case EARLY:
+ offsets = this.offsets;
+ break;
+ case NORMAL:
+ case FINISH_EARLY:
+ offsets = this.offsets.copy(count * 8L);
+ this.offsets.unreference();
+ break;
+ default:
+ throw new AssertionError();
+ }
- return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
+ return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, latestVersion.hasPostCompressionAdlerChecksums());
}
- public CompressionMetadata openAfterClose(long dataLength, long compressedLength)
- {
- RefCountedMemory newOffsets = offsets.copy(count * 8L);
- offsets.unreference();
- return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, latestVersion.hasPostCompressionAdlerChecksums());
- }
-
/**
* Get a chunk offset by it's index.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 8b25f59,43ac4b6..e7a28d3
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@@ -36,11 -29,8 +29,10 @@@ import org.apache.cassandra.db.DataTrac
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.FBUtilities;
- import org.apache.cassandra.utils.Pair;
/**
* Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index ff85320,0000000..41898c8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1,1900 -1,0 +1,1878 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.RateLimiter;
+
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.cache.InstrumentingCache;
+import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedThrottledReader;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.*;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
+
+/**
+ * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
+ * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
+ */
+public abstract class SSTableReader extends SSTable
+{
+ private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
+
+ private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
+ private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
+
+ public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ long ts1 = o1.getMaxTimestamp();
+ long ts2 = o2.getMaxTimestamp();
+ return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
+ }
+ };
+
+ public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return o1.first.compareTo(o2.first);
+ }
+ };
+
+ public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
+
+ /**
+ * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
+ * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
+ * later than maxDataAge.
+ *
+ * The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
+ *
+ * When a new sstable is flushed, maxDataAge is set to the time of creation.
+ * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
+ *
+ * The age is in milliseconds since epoc and is local to this host.
+ */
+ public final long maxDataAge;
+
+ public enum OpenReason
+ {
+ NORMAL,
+ EARLY,
+ METADATA_CHANGE
+ }
+
+ public final OpenReason openReason;
+
+ // indexfile and datafile: might be null before a call to load()
+ protected SegmentedFile ifile;
+ protected SegmentedFile dfile;
+
+ protected IndexSummary indexSummary;
+ protected IFilter bf;
+
+ protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+
+ protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
+
+ protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
+
+ protected final AtomicInteger references = new AtomicInteger(1);
+ // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
+ // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
+ protected final AtomicBoolean isCompacted = new AtomicBoolean(false);
+ protected final AtomicBoolean isSuspect = new AtomicBoolean(false);
+
+ // not final since we need to be able to change level on a file.
+ protected volatile StatsMetadata sstableMetadata;
+
+ protected final AtomicLong keyCacheHit = new AtomicLong(0);
+ protected final AtomicLong keyCacheRequest = new AtomicLong(0);
+
+ /**
+ * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
+ * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
+ * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
+ * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
+ */
+ protected Object replaceLock = new Object();
+ protected SSTableReader replacedBy;
+ private SSTableReader replaces;
- private SSTableReader sharesBfWith;
+ private SSTableDeletingTask deletingTask;
+ private Runnable runOnClose;
+
+ @VisibleForTesting
+ public RestorableMeter readMeter;
+ protected ScheduledFuture readMeterSyncFuture;
+
+ /**
+ * Calculate approximate key count.
+ * If cardinality estimator is available on all given sstables, then this method use them to estimate
+ * key count.
+ * If not, then this uses index summaries.
+ *
+ * @param sstables SSTables to calculate key count
+ * @return estimated key count
+ */
+ public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
+ {
+ long count = -1;
+
+ // check if cardinality estimator is available for all SSTables
+ boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
+ {
+ public boolean apply(SSTableReader sstable)
+ {
+ return sstable.descriptor.version.hasNewStatsFile();
+ }
+ });
+
+ // if it is, load them to estimate key count
+ if (cardinalityAvailable)
+ {
+ boolean failed = false;
+ ICardinality cardinality = null;
+ for (SSTableReader sstable : sstables)
+ {
+ try
+ {
+ CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
+ assert metadata != null : sstable.getFilename();
+ if (cardinality == null)
+ cardinality = metadata.cardinalityEstimator;
+ else
+ cardinality = cardinality.merge(metadata.cardinalityEstimator);
+ }
+ catch (IOException e)
+ {
+ logger.warn("Reading cardinality from Statistics.db failed.", e);
+ failed = true;
+ break;
+ }
+ catch (CardinalityMergeException e)
+ {
+ logger.warn("Cardinality merge failed.", e);
+ failed = true;
+ break;
+ }
+ }
+ if (cardinality != null && !failed)
+ count = cardinality.cardinality();
+ }
+
+ // if something went wrong above or cardinality is not available, calculate using index summary
+ if (count < 0)
+ {
+ for (SSTableReader sstable : sstables)
+ count += sstable.estimatedKeys();
+ }
+ return count;
+ }
+
+ public static SSTableReader open(Descriptor descriptor) throws IOException
+ {
+ CFMetaData metadata;
+ if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
+ {
+ int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
+ String parentName = descriptor.cfname.substring(0, i);
+ CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
+ ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
+ metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
+ }
+ else
+ {
+ metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname);
+ }
+ return open(descriptor, metadata);
+ }
+
+ public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
+ {
+ IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
+ ? new LocalPartitioner(metadata.getKeyValidator())
+ : StorageService.getPartitioner();
+ return open(desc, componentsFor(desc), metadata, p);
+ }
+
+ public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ {
+ return open(descriptor, components, metadata, partitioner, true);
+ }
+
+ public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
+ {
+ return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
+ }
+
+ /**
+ * Open SSTable reader to be used in batch mode(such as sstableloader).
+ *
+ * @param descriptor
+ * @param components
+ * @param metadata
+ * @param partitioner
+ * @return opened SSTableReader
+ * @throws IOException
+ */
+ public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ {
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+ assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+ Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
+ EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+ ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+ StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, validationMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
+
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
+ SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
+ statsMetadata, OpenReason.NORMAL);
+
+ // special implementation of load to use non-pooled SegmentedFile builders
+ SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
+ SegmentedFile.Builder dbuilder = sstable.compression
+ ? new CompressedSegmentedFile.Builder(null)
+ : new BufferedSegmentedFile.Builder();
+ if (!sstable.loadSummary(ibuilder, dbuilder))
+ sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
+ sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
+ sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
+ sstable.bf = FilterFactory.AlwaysPresent;
+
+ return sstable;
+ }
+
+ private static SSTableReader open(Descriptor descriptor,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ boolean validate) throws IOException
+ {
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+ assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+ Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
+ EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+ ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+ StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, validationMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
+
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
+ SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
+ statsMetadata, OpenReason.NORMAL);
+
+ // load index and filter
+ long start = System.nanoTime();
+ sstable.load(validationMetadata);
+ logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+
+ if (validate)
+ sstable.validate();
+
+ if (sstable.getKeyCache() != null)
+ logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
+
+ return sstable;
+ }
+
+ public static void logOpenException(Descriptor descriptor, IOException e)
+ {
+ if (e instanceof FileNotFoundException)
+ logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage());
+ else
+ logger.error("Corrupt sstable {}; skipped", descriptor, e);
+ }
+
+ public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
+ final CFMetaData metadata,
+ final IPartitioner partitioner)
+ {
+ final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
+
+ ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
+ for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
+ {
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ SSTableReader sstable;
+ try
+ {
+ sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
+ }
+ catch (IOException ex)
+ {
+ logger.error("Corrupt sstable {}; skipped", entry, ex);
+ return;
+ }
+ sstables.add(sstable);
+ }
+ };
+ executor.submit(runnable);
+ }
+
+ executor.shutdown();
+ try
+ {
+ executor.awaitTermination(7, TimeUnit.DAYS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ return sstables;
+
+ }
+
+ /**
+ * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
+ */
+ public static SSTableReader internalOpen(Descriptor desc,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ SegmentedFile ifile,
+ SegmentedFile dfile,
+ IndexSummary isummary,
+ IFilter bf,
+ long maxDataAge,
+ StatsMetadata sstableMetadata,
+ OpenReason openReason)
+ {
+ assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
+
+ SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+
+ reader.bf = bf;
+ reader.ifile = ifile;
+ reader.dfile = dfile;
+ reader.indexSummary = isummary;
+
+ return reader;
+ }
+
+
+ private static SSTableReader internalOpen(final Descriptor descriptor,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ Long maxDataAge,
+ StatsMetadata sstableMetadata,
+ OpenReason openReason)
+ {
+ Factory readerFactory = descriptor.getFormat().getReaderFactory();
+
+ return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+ }
+
+ protected SSTableReader(final Descriptor desc,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ long maxDataAge,
+ StatsMetadata sstableMetadata,
+ OpenReason openReason)
+ {
+ super(desc, components, metadata, partitioner);
+ this.sstableMetadata = sstableMetadata;
+ this.maxDataAge = maxDataAge;
+ this.openReason = openReason;
+
+ this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
+
+ deletingTask = new SSTableDeletingTask(this);
+
+ // Don't track read rates for tables in the system keyspace. Also don't track reads for special operations (like early open)
+ // this is to avoid overflowing the executor queue (see CASSANDRA-8066)
+ if (SystemKeyspace.NAME.equals(desc.ksname) || openReason != OpenReason.NORMAL)
+ {
+ readMeter = null;
+ readMeterSyncFuture = null;
+ return;
+ }
+
+ readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+ // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
+ readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
+ {
+ public void run()
+ {
+ if (!isCompacted.get())
+ {
+ meterSyncThrottle.acquire();
+ SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
+ }
+ }
+ }, 1, 5, TimeUnit.MINUTES);
+ }
+
+ public static long getTotalBytes(Iterable<SSTableReader> sstables)
+ {
+ long sum = 0;
+ for (SSTableReader sstable : sstables)
+ {
+ sum += sstable.onDiskLength();
+ }
+ return sum;
+ }
+
+ private void tidy(boolean release)
+ {
+ if (readMeterSyncFuture != null)
+ readMeterSyncFuture.cancel(false);
+
+ if (references.get() != 0)
+ {
+ throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
+ }
+
+ synchronized (replaceLock)
+ {
- boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
++ boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
+
+ if (replacedBy != null)
+ {
+ closeBf = replacedBy.bf != bf;
+ closeSummary = replacedBy.indexSummary != indexSummary;
+ closeFiles = replacedBy.dfile != dfile;
+ // if the replacement sstablereader uses a different path, clean up our paths
+ deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
+ }
+
+ if (replaces != null)
+ {
+ closeBf &= replaces.bf != bf;
+ closeSummary &= replaces.indexSummary != indexSummary;
+ closeFiles &= replaces.dfile != dfile;
+ deleteFiles &= !dfile.path.equals(replaces.dfile.path);
+ }
+
- if (sharesBfWith != null)
- {
- closeBf &= sharesBfWith.bf != bf;
- closeSummary &= sharesBfWith.indexSummary != indexSummary;
- closeFiles &= sharesBfWith.dfile != dfile;
- deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path);
- }
-
+ boolean deleteAll = false;
+ if (release && isCompacted.get())
+ {
+ assert replacedBy == null;
- if (replaces != null)
++ if (replaces != null && !deleteFiles)
+ {
+ replaces.replacedBy = null;
+ replaces.deletingTask = deletingTask;
+ replaces.markObsolete();
+ }
+ else
+ {
+ deleteAll = true;
+ }
+ }
+ else
+ {
+ if (replaces != null)
+ replaces.replacedBy = replacedBy;
+ if (replacedBy != null)
+ replacedBy.replaces = replaces;
+ }
+
+ scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
+ }
+ }
+
+ private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
+ {
+ if (references.get() != 0)
+ throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
+
+ final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+ final OpOrder.Barrier barrier;
+ if (cfs != null)
+ {
+ barrier = cfs.readOrdering.newBarrier();
+ barrier.issue();
+ }
+ else
+ barrier = null;
+
+ ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
+ {
+ public void run()
+ {
+ if (barrier != null)
+ barrier.await();
+ if (closeBf)
+ bf.close();
+ if (closeSummary)
+ indexSummary.close();
+ if (closeFiles)
+ {
+ ifile.cleanup();
+ dfile.cleanup();
+ }
+ if (runOnClose != null)
+ runOnClose.run();
+ if (deleteAll)
+ {
+ /**
+ * Do the OS a favour and suggest (using fadvice call) that we
+ * don't want to see pages of this SSTable in memory anymore.
+ *
+ * NOTE: We can't use madvice in java because it requires the address of
+ * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
+ */
+ dropPageCache();
+ deletingTask.run();
+ }
+ else if (deleteFiles)
+ {
+ FileUtils.deleteWithConfirm(new File(dfile.path));
+ FileUtils.deleteWithConfirm(new File(ifile.path));
+ }
+ }
+ });
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
+ }
+
+ public int hashCode()
+ {
+ return this.descriptor.hashCode();
+ }
+
+ public String getFilename()
+ {
+ return dfile.path;
+ }
+
+ public String getIndexFilename()
+ {
+ return ifile.path;
+ }
+
+ public void setTrackedBy(DataTracker tracker)
+ {
+ deletingTask.setTracker(tracker);
+ // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
+ // e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache
+ // here when we know we're being wired into the rest of the server infrastructure.
+ keyCache = CacheService.instance.keyCache;
+ }
+
+ private void load(ValidationMetadata validation) throws IOException
+ {
+ if (metadata.getBloomFilterFpChance() == 1.0)
+ {
+ // bf is disabled.
+ load(false, true);
+ bf = FilterFactory.AlwaysPresent;
+ }
+ else if (!components.contains(Component.FILTER) || validation == null)
+ {
+ // bf is enabled, but filter component is missing.
+ load(true, true);
+ }
+ else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
+ {
+ // bf fp chance in sstable metadata and it has changed since compaction.
+ load(true, true);
+ }
+ else
+ {
+ // bf is enabled and fp chance matches the currently configured value.
+ load(false, true);
+ loadBloomFilter();
+ }
+ }
+
+ /**
+ * Load bloom filter from Filter.db file.
+ *
+ * @throws IOException
+ */
+ private void loadBloomFilter() throws IOException
+ {
+ DataInputStream stream = null;
+ try
+ {
+ stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
+ bf = FilterFactory.deserialize(stream, true);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(stream);
+ }
+ }
+
+ /**
+ * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
+ * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
+ * avoid persisting it to disk by setting this to false
+ */
+ private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
+ {
+ SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ SegmentedFile.Builder dbuilder = compression
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+
+ boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
+ if (recreateBloomFilter || !summaryLoaded)
+ buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
+
+ ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+ if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
+ saveSummary(ibuilder, dbuilder);
+ }
+
+ /**
+ * Build index summary(and optionally bloom filter) by reading through Index.db file.
+ *
+ * @param recreateBloomFilter true if recreate bloom filter
+ * @param ibuilder
+ * @param dbuilder
+ * @param summaryLoaded true if index summary is already loaded and not need to build again
+ * @throws IOException
+ */
+ private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
+ {
+ // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+ RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+
+ try
+ {
+ long indexSize = primaryIndex.length();
+ long histogramCount = sstableMetadata.estimatedRowSize.count();
+ long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
+ ? histogramCount
+ : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
+
+ if (recreateBloomFilter)
+ bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
+
+ IndexSummaryBuilder summaryBuilder = null;
+ if (!summaryLoaded)
+ summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
+
+ long indexPosition;
+ RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
+
+ while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ {
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+ RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
+ DecoratedKey decoratedKey = partitioner.decorateKey(key);
+ if (first == null)
+ first = decoratedKey;
+ last = decoratedKey;
+
+ if (recreateBloomFilter)
+ bf.add(decoratedKey.getKey());
+
+ // if summary was already read from disk we don't want to re-populate it using primary index
+ if (!summaryLoaded)
+ {
+ summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
+ ibuilder.addPotentialBoundary(indexPosition);
+ dbuilder.addPotentialBoundary(indexEntry.position);
+ }
+ }
+
+ if (!summaryLoaded)
+ indexSummary = summaryBuilder.build(partitioner);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(primaryIndex);
+ }
+
+ first = getMinimalKey(first);
+ last = getMinimalKey(last);
+ }
+
+ /**
+ * Load index summary from Summary.db file if it exists.
+ *
+ * if loaded index summary has different index interval from current value stored in schema,
+ * then Summary.db file will be deleted and this returns false to rebuild summary.
+ *
+ * @param ibuilder
+ * @param dbuilder
+ * @return true if index summary is loaded successfully from Summary.db file.
+ */
+ public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+ if (!summariesFile.exists())
+ return false;
+
+ DataInputStream iStream = null;
+ try
+ {
+ iStream = new DataInputStream(new FileInputStream(summariesFile));
+ indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
+ first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ ibuilder.deserializeBounds(iStream);
+ dbuilder.deserializeBounds(iStream);
+ }
+ catch (IOException e)
+ {
+ logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
+ // corrupted; delete it and fall back to creating a new summary
+ FileUtils.closeQuietly(iStream);
+ // delete it and fall back to creating a new summary
+ FileUtils.deleteWithConfirm(summariesFile);
+ return false;
+ }
+ finally
+ {
+ FileUtils.closeQuietly(iStream);
+ }
+
+ return true;
+ }
+
+ /**
+ * Save index summary to Summary.db file.
+ *
+ * @param ibuilder
+ * @param dbuilder
+ */
+ public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ saveSummary(ibuilder, dbuilder, indexSummary);
+ }
+
+ private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
+ {
+ File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+ if (summariesFile.exists())
+ FileUtils.deleteWithConfirm(summariesFile);
+
+ DataOutputStreamAndChannel oStream = null;
+ try
+ {
+ oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
+ IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
+ ByteBufferUtil.writeWithLength(first.getKey(), oStream);
+ ByteBufferUtil.writeWithLength(last.getKey(), oStream);
+ ibuilder.serializeBounds(oStream);
+ dbuilder.serializeBounds(oStream);
+ }
+ catch (IOException e)
+ {
+ logger.debug("Cannot save SSTable Summary: ", e);
+
+ // corrupted hence delete it and let it load it now.
+ if (summariesFile.exists())
+ FileUtils.deleteWithConfirm(summariesFile);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(oStream);
+ }
+ }
+
+ public void setReplacedBy(SSTableReader replacement)
+ {
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+ replacedBy = replacement;
+ replacement.replaces = this;
+ replacement.replaceLock = replaceLock;
+ }
+ }
+
- /**
- * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter
- *
- * note that the reason we don't use replacedBy is that we are not yet actually replaced
- *
- * @param newReader
- */
- public void sharesBfWith(SSTableReader newReader)
- {
- assert openReason.equals(OpenReason.EARLY);
- this.sharesBfWith = newReader;
- }
-
+ public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
+ {
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+
+ if (newStart.compareTo(this.first) > 0)
+ {
+ if (newStart.compareTo(this.last) > 0)
+ {
+ this.runOnClose = new Runnable()
+ {
+ public void run()
+ {
+ CLibrary.trySkipCache(dfile.path, 0, 0);
+ CLibrary.trySkipCache(ifile.path, 0, 0);
+ runOnClose.run();
+ }
+ };
+ }
+ else
+ {
+ final long dataStart = getPosition(newStart, Operator.GE).position;
+ final long indexStart = getIndexScanPosition(newStart);
+ this.runOnClose = new Runnable()
+ {
+ public void run()
+ {
+ CLibrary.trySkipCache(dfile.path, 0, dataStart);
+ CLibrary.trySkipCache(ifile.path, 0, indexStart);
+ runOnClose.run();
+ }
+ };
+ }
+ }
+
+ SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata,
+ openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
+ replacement.readMeterSyncFuture = this.readMeterSyncFuture;
+ replacement.readMeter = this.readMeter;
+ replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
+ replacement.last = this.last;
+ setReplacedBy(replacement);
+ return replacement;
+ }
+ }
+
+ /**
+ * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
+ * be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have
+ * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
+ * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
+ * @return a new SSTableReader
+ * @throws IOException
+ */
+ public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
+ {
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+
+ int minIndexInterval = metadata.getMinIndexInterval();
+ int maxIndexInterval = metadata.getMaxIndexInterval();
+ double effectiveInterval = indexSummary.getEffectiveIndexInterval();
+
+ IndexSummary newSummary;
+ long oldSize = bytesOnDisk();
+
+ // We have to rebuild the summary from the on-disk primary index in three cases:
+ // 1. The sampling level went up, so we need to read more entries off disk
+ // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
+ // at full sampling (and consequently at any other sampling level)
+ // 3. The max_index_interval was lowered, forcing us to raise the sampling level
+ if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
+ {
+ newSummary = buildSummaryAtLevel(samplingLevel);
+ }
+ else if (samplingLevel < indexSummary.getSamplingLevel())
+ {
+ // we can use the existing index summary to make a smaller one
+ newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
+
+ SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ SegmentedFile.Builder dbuilder = compression
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ saveSummary(ibuilder, dbuilder, newSummary);
+ }
+ else
+ {
+ throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
+ "no adjustments to min/max_index_interval");
+ }
+
+ long newSize = bytesOnDisk();
+ StorageMetrics.load.inc(newSize - oldSize);
+ parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
+
+ SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata,
+ openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
+ replacement.readMeterSyncFuture = this.readMeterSyncFuture;
+ replacement.readMeter = this.readMeter;
+ replacement.first = this.first;
+ replacement.last = this.last;
+ setReplacedBy(replacement);
+ return replacement;
+ }
+ }
+
+ private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
+ {
+ // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+ RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+ try
+ {
+ long indexSize = primaryIndex.length();
+ IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
+
+ long indexPosition;
+ while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ {
+ summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+ RowIndexEntry.Serializer.skip(primaryIndex);
+ }
+
+ return summaryBuilder.build(partitioner);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(primaryIndex);
+ }
+ }
+
+ public int getIndexSummarySamplingLevel()
+ {
+ return indexSummary.getSamplingLevel();
+ }
+
+ public long getIndexSummaryOffHeapSize()
+ {
+ return indexSummary.getOffHeapSize();
+ }
+
+ public int getMinIndexInterval()
+ {
+ return indexSummary.getMinIndexInterval();
+ }
+
+ public double getEffectiveIndexInterval()
+ {
+ return indexSummary.getEffectiveIndexInterval();
+ }
+
+ public void releaseSummary() throws IOException
+ {
+ indexSummary.close();
+ indexSummary = null;
+ }
+
+ private void validate()
+ {
+ if (this.first.compareTo(this.last) > 0)
+ throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
+ }
+
+ /**
+ * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
+ * modulo downsampling of the index summary).
+ */
+ public long getIndexScanPosition(RowPosition key)
+ {
+ return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
+ }
+
+ protected static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
+ {
+ if (binarySearchResult == -1)
+ return -1;
+ else
+ return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
+ }
+
+ protected static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
+ {
+ if (binarySearchResult < 0)
+ {
+ // binary search gives us the first index _greater_ than the key searched for,
+ // i.e., its insertion position
+ int greaterThan = (binarySearchResult + 1) * -1;
+ if (greaterThan == 0)
+ return -1;
+ return greaterThan - 1;
+ }
+ else
+ {
+ return binarySearchResult;
+ }
+ }
+
+ /**
+ * Returns the compression metadata for this sstable.
+ * @throws IllegalStateException if the sstable is not compressed
+ */
+ public CompressionMetadata getCompressionMetadata()
+ {
+ if (!compression)
+ throw new IllegalStateException(this + " is not compressed");
+
+ CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
+
+ //We need the parent cf metadata
+ String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
+ cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
+
+ return cmd;
+ }
+
+ /**
+ * Returns the amount of memory in bytes used off heap by the compression meta-data.
+ * @return the amount of memory in bytes used off heap by the compression meta-data
+ */
+ public long getCompressionMetadataOffHeapSize()
+ {
+ if (!compression)
+ return 0;
+
+ return getCompressionMetadata().offHeapSize();
+ }
+
+ /**
+ * For testing purposes only.
+ */
+ public void forceFilterFailures()
+ {
+ bf = FilterFactory.AlwaysPresent;
+ }
+
+ public IFilter getBloomFilter()
+ {
+ return bf;
+ }
+
+ public long getBloomFilterSerializedSize()
+ {
+ return bf.serializedSize();
+ }
+
+ /**
+ * Returns the amount of memory in bytes used off heap by the bloom filter.
+ * @return the amount of memory in bytes used off heap by the bloom filter
+ */
+ public long getBloomFilterOffHeapSize()
+ {
+ return bf.offHeapSize();
+ }
+
+ /**
+ * @return An estimate of the number of keys in this SSTable based on the index summary.
+ */
+ public long estimatedKeys()
+ {
+ return indexSummary.getEstimatedKeyCount();
+ }
+
+ /**
+ * @param ranges
+ * @return An estimate of the number of keys for given ranges in this SSTable.
+ */
+ public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
+ {
+ long sampleKeyCount = 0;
+ List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
+ for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
+ sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
+
+ // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
+ long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
+ return Math.max(1, estimatedKeys);
+ }
+
+ /**
+ * Returns the number of entries in the IndexSummary. At full sampling, this is approximately 1/INDEX_INTERVALth of
+ * the keys in this SSTable.
+ */
+ public int getIndexSummarySize()
+ {
+ return indexSummary.size();
+ }
+
+ /**
+ * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
+ */
+ public int getMaxIndexSummarySize()
+ {
+ return indexSummary.getMaxNumberOfEntries();
+ }
+
+ /**
+ * Returns the key for the index summary entry at `index`.
+ */
+ public byte[] getIndexSummaryKey(int index)
+ {
+ return indexSummary.getKey(index);
+ }
+
+ private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
+ {
+ // use the index to determine a minimal section for each range
+ List<Pair<Integer,Integer>> positions = new ArrayList<>();
+
+ for (Range<Token> range : Range.normalize(ranges))
+ {
+ RowPosition leftPosition = range.left.maxKeyBound();
+ RowPosition rightPosition = range.right.maxKeyBound();
+
+ int left = summary.binarySearch(leftPosition);
+ if (left < 0)
+ left = (left + 1) * -1;
+ else
+ // left range are start exclusive
+ left = left + 1;
+ if (left == summary.size())
+ // left is past the end of the sampling
+ continue;
+
+ int right = Range.isWrapAround(range.left, range.right)
+ ? summary.size() - 1
+ : summary.binarySearch(rightPosition);
+ if (right < 0)
+ {
+ // range are end inclusive so we use the previous index from what binarySearch give us
+ // since that will be the last index we will return
+ right = (right + 1) * -1;
+ if (right == 0)
+ // Means the first key is already stricly greater that the right bound
+ continue;
+ right--;
+ }
+
+ if (left > right)
+ // empty range
+ continue;
+ positions.add(Pair.create(left, right));
+ }
+ return positions;
+ }
+
+ public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
+ {
+ final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
+
+ if (indexRanges.isEmpty())
+ return Collections.emptyList();
+
+ return new Iterable<DecoratedKey>()
+ {
+ public Iterator<DecoratedKey> iterator()
+ {
+ return new Iterator<DecoratedKey>()
+ {
+ private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator();
+ private Pair<Integer, Integer> current;
+ private int idx;
+
+ public boolean hasNext()
+ {
+ if (current == null || idx > current.right)
+ {
+ if (rangeIter.hasNext())
+ {
+ current = rangeIter.next();
+ idx = current.left;
+ return true;
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+ public DecoratedKey next()
+ {
+ byte[] bytes = indexSummary.getKey(idx++);
+ return partitioner.decorateKey(ByteBuffer.wrap(bytes));
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
+ * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
+ */
+ public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
+ {
+ // use the index to determine a minimal section for each range
+ List<Pair<Long,Long>> positions = new ArrayList<>();
+ for (Range<Token> range : Range.normalize(ranges))
+ {
+ assert !range.isWrapAround() || range.right.isMinimum();
+ // truncate the range so it at most covers the sstable
+ AbstractBounds<RowPosition> bounds = range.toRowBounds();
+ RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
+ RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
+
+ if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
+ continue;
+
+ long left = getPosition(leftBound, Operator.GT).position;
+ long right = (rightBound.compareTo(last) > 0)
+ ? (openReason == OpenReason.EARLY
+ // if opened early, we overlap with the old sstables by one key, so we know that the last
+ // (and further) key(s) will be streamed from these if necessary
+ ? getPosition(last.getToken().maxKeyBound(), Operator.GT).position
+ : uncompressedLength())
+ : getPosition(rightBound, Operator.GT).position;
+
+ if (left == right)
+ // empty range
+ continue;
+
+ assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right);
+ positions.add(Pair.create(left, right));
+ }
+ return positions;
+ }
+
+ public void invalidateCacheKey(DecoratedKey key)
+ {
+ KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
+ keyCache.remove(cacheKey);
+ }
+
+ public void cacheKey(DecoratedKey key, RowIndexEntry info)
+ {
+ CachingOptions caching = metadata.getCaching();
+
+ if (!caching.keyCache.isEnabled()
+ || keyCache == null
+ || keyCache.getCapacity() == 0)
+ {
+ return;
+ }
+
+ KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
+ logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
+ keyCache.put(cacheKey, info);
+ }
+
+ public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
+ {
+ return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
+ }
+
+ protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
+ {
+ if (keyCache != null && keyCache.getCapacity() > 0) {
+ if (updateStats)
+ {
+ RowIndexEntry cachedEntry = keyCache.get(unifiedKey);
+ keyCacheRequest.incrementAndGet();
+ if (cachedEntry != null)
+ keyCacheHit.incrementAndGet();
+ return cachedEntry;
+ }
+ else
+ {
+ return keyCache.getInternal(unifiedKey);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get position updating key cache and stats.
+ * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
+ */
+ public RowIndexEntry getPosition(RowPosition key, Operator op)
+ {
+ return getPosition(key, op, true);
+ }
+
+ /**
+ * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
+ * allow key selection by token bounds but only if op != * EQ
+ * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
+ * @param updateCacheAndStats true if updating stats and cache
+ * @return The index entry corresponding to the key, or null if the key is not present
+ */
+ public abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats);
+
+ //Corresponds to a name column
+ public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
+ public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
+
+ //Corresponds to a slice query
+ public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
+ public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
+
+ /**
+ * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
+ */
+ public DecoratedKey firstKeyBeyond(RowPosition token)
+ {
+ long sampledPosition = getIndexScanPosition(token);
+ if (sampledPosition == -1)
+ sampledPosition = 0;
+
+ Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
+ while (segments.hasNext())
+ {
+ FileDataInput in = segments.next();
+ try
+ {
+ while (!in.isEOF())
+ {
+ ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+ DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
+ if (indexDecoratedKey.compareTo(token) > 0)
+ return indexDecoratedKey;
+
+ RowIndexEntry.Serializer.skip(in);
+ }
+ }
+ catch (IOException e)
+ {
+ markSuspect();
+ throw new CorruptSSTableException(e, in.getPath());
+ }
+ finally
+ {
+ FileUtils.closeQuietly(in);
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * @return The length in bytes of the data for this SSTable. For
+ * compressed files, this is not the same thing as the on disk size (see
+ * onDiskLength())
+ */
+ public long uncompressedLength()
+ {
+ return dfile.length;
+ }
+
+ /**
+ * @return The length in bytes of the on disk size for this SSTable. For
+ * compressed files, this is not the same thing as the data length (see
+ * length())
+ */
+ public long onDiskLength()
+ {
+ return dfile.onDiskLength;
+ }
+
+ public boolean acquireReference()
+ {
+ while (true)
+ {
+ int n = references.get();
+ if (n <= 0)
+ return false;
+ if (references.compareAndSet(n, n + 1))
+ return true;
+ }
+ }
+
+ @VisibleForTesting
+ public int referenceCount()
+ {
+ return references.get();
+ }
+
+ /**
+ * Release reference to this SSTableReader.
+ * If there is no one referring to this SSTable, and is marked as compacted,
+ * all resources are cleaned up and files are deleted eventually.
+ */
+ public void releaseReference()
+ {
+ if (references.decrementAndGet() == 0)
+ tidy(true);
+ assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;
+ }
+
+ /**
+ * Mark the sstable as obsolete, i.e., compacted into newer sstables.
+ *
+ * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
+ * except for threads holding a reference.
+ *
+ * @return true if the this is the first time the file was marked obsolete. Calling this
+ * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
+ */
+ public boolean markObsolete()
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Marking {} compacted", getFilename());
+
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null : getFilename();
+ }
+ return !isCompacted.getAndSet(true);
+ }
+
+ public boolean isMarkedCompacted()
+ {
+ return isCompacted.get();
+ }
+
+ public void markSuspect()
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Marking {} as a suspect for blacklisting.", getFilename());
+
+ isSuspect.getAndSet(true);
+ }
+
+ public boolean isMarkedSuspect()
+ {
+ return isSuspect.get();
+ }
+
+
+ /**
+ * I/O SSTableScanner
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public ISSTableScanner getScanner()
+ {
+ return getScanner((RateLimiter) null);
+ }
+
+ public ISSTableScanner getScanner(RateLimiter limiter)
+ {
+ return getScanner(DataRange.allData(partitioner), limiter);
+ }
+
+ /**
+ *
+ * @param dataRange filter to use when reading the columns
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public ISSTableScanner getScanner(DataRange dataRange)
+ {
+ return getScanner(dataRange, null);
+ }
+
+ /**
+ * Direct I/O SSTableScanner over a defined range of tokens.
+ *
+ * @param range the range of keys to cover
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public ISSTableScanner getScanner(Range<Token> range, RateLimiter limiter)
+ {
+ if (range == null)
+ return getScanner(limiter);
+ return getScanner(Collections.singletonList(range), limiter);
+ }
+
+ /**
+ * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
+ *
+ * @param ranges the range of keys to cover
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
+
+ /**
+ *
+ * @param dataRange filter to use when reading the columns
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public abstract ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter);
+
+
+
+ public FileDataInput getFileDataInput(long position)
+ {
+ return dfile.getSegment(position);
+ }
+
+ /**
+ * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time).
+ * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable.
+ * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host
+ * @return True iff this sstable contains data that's newer than the given age parameter.
+ */
+ public boolean newSince(long age)
+ {
+ return maxDataAge > age;
+ }
+
+ public void createLinks(String snapshotDirectoryPath)
+ {
+ for (Component component : components)
+ {
+ File sourceFile = new File(descriptor.filenameFor(component));
+ File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
+ FileUtils.createHardLink(sourceFile, targetLink);
+ }
+ }
+
+ public boolean isRepaired()
+ {
+ return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
+ }
+
+ public SSTableReader getCurrentReplacement()
+ {
+ synchronized (replaceLock)
+ {
+ SSTableReader cur = this, next = replacedBy;
+ while (next != null)
+ {
+ cur = next;
+ next = next.replacedBy;
+ }
+ return cur;
+ }
+ }
+
+ /**
+ * TODO: Move someplace reusable
+ */
+ public abstract static class Operator
+ {
+ public static final Operator EQ = new Equals();
+ public static final Operator GE = new GreaterThanOrEqualTo();
+ public static final Operator GT = new GreaterThan();
+
+ /**
+ * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
+ * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
+ */
+ public abstract int apply(int comparison);
+
+ final static class Equals extends Operator
+ {
+ public int apply(int comparison) { return -comparison; }
+ }
+
+ final static class GreaterThanOrEqualTo extends Operator
+ {
+ public int apply(int comparison) { return comparison >= 0 ? 0 : -comparison; }
+ }
+
+ final static class GreaterThan extends Operator
+ {
+ public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
+ }
+ }
+
+ public long getBloomFilterFalsePositiveCount()
+ {
+ return bloomFilterTracker.getFalsePositiveCount();
+ }
+
+ public long getRecentBloomFilterFalsePositiveCount()
+ {
+ return bloomFilterTracker.getRecentFalsePositiveCount();
+ }
+
+ public long getBloomFilterTruePositiveCount()
+ {
+ return bloomFilterTracker.getTruePositiveCount();
+ }
+
+ public long getRecentBloomFilterTruePositiveCount()
+ {
+ return bloomFilterTracker.getRecentTruePositiveCount();
+ }
+
+ public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
+ {
+ return keyCache;
+ }
+
+ public EstimatedHistogram getEstimatedRowSize()
+ {
+ return sstableMetadata.estimatedRowSize;
+ }
+
+ public EstimatedHistogram getEstimatedColumnCount()
+ {
+ return sstableMetadata.estimatedColumnCount;
+ }
+
+ public double getEstimatedDroppableTombstoneRatio(int gcBefore)
+ {
+ return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
+ }
+
+ public double getDroppableTombstonesBefore(int gcBefore)
+ {
+ return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
+ }
+
+ public double getCompressionRatio()
+ {
+ return sstableMetadata.compressionRatio;
+ }
+
+ public ReplayPosition getReplayPosition()
+ {
+ return sstableMetadata.replayPosition;
+ }
+
+ public long getMinTimestamp()
+ {
+ return sstableMetadata.minTimestamp;
+ }
+
+ public long getMaxTimestamp()
+ {
+ return sstableMetadata.maxTimestamp;
+ }
+
+ public Set<Integer> getAncestors()
+ {
+ try
+ {
+ CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
+ return compactionMetadata.ancestors;
+ }
+ catch (IOException e)
+ {
+ SSTableReader.logOpenException(descriptor, e);
+ return Collections.emptySet();
+ }
+ }
+
+ public int getSSTableLevel()
+ {
+ return sstableMetadata.sstableLevel;
+ }
+
+ /**
+ * Reloads the sstable metadata from disk.
+ *
+ * Called after level is changed on sstable, for example if the sstable is dropped to L0
+ *
+ * Might be possible to remove in future versions
+ *
+ * @throws IOException
+ */
+ public void reloadSSTableMetadata() throws IOException
+ {
+ this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
+ }
+
+ public StatsMetadata getSSTableMetadata()
+ {
+ return sstableMetadata;
+ }
+
+ public RandomAccessReader openDataReader(RateLimiter limiter)
+ {
+ assert limiter != null;
+ return compression
+ ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter)
+ : ThrottledReader.open(new File(getFilename()), limiter);
+ }
+
+ public RandomAccessReader openDataReader()
+ {
+ return compression
+ ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata())
+ : RandomAccessReader.open(new File(getFilename()));
+ }
+
+ public RandomAccessReader openIndexReader()
+ {
+ return RandomAccessReader.open(new File(getIndexFilename()));
+ }
+
+ /**
+ * @param component component to get timestamp.
+ * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
+ */
+ public long getCreationTimeFor(Component component)
+ {
+ return new File(descriptor.filenameFor(component)).lastModified();
+ }
+
+ /**
+ * @return Number of key cache hit
+ */
+ public long getKeyCacheHit()
+ {
+ return keyCacheHit.get();
+ }
+
+ /**
+ * @return Number of key cache request
+ */
+ public long getKeyCacheRequest()
+ {
+ return keyCacheRequest.get();
+ }
+
+ /**
+ * @param sstables
+ * @return true if all desired references were acquired. Otherwise, it will unreference any partial acquisition, and return false.
+ */
+ public static boolean acquireReferences(Iterable<SSTableReader> sstables)
+ {
+ SSTableReader failed = null;
+ for (SSTableReader sstable : sstables)
+ {
+ if (!sstable.acquireReference())
+ {
+ failed = sstable;
+ break;
+ }
+ }
+
+ if (failed == null)
+ return true;
+
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable == failed)
+ break;
+ sstable.releaseReference();
+ }
+ return false;
+ }
+
+ public static void releaseReferences(Iterable<SSTableReader> sstables)
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ sstable.releaseReference();
+ }
+ }
+
+ private void dropPageCache()
+ {
+ dropPageCache(dfile.path);
+ dropPageCache(ifile.path);
+ }
+
+ private void dropPageCache(String filePath)
+ {
+ RandomAccessFile file = null;
+
+ try
+ {
+ file = new RandomAccessFile(filePath, "r");
+
+ int fd = CLibrary.getfd(file.getFD());
+
+ if (fd > 0)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Dropping page cache of file %s.", filePath));
+
+ CLibrary.trySkipCache(fd, 0, 0);
+ }
+ }
+ catch (IOException e)
+ {
+ // we don't care if cache cleanup fails
+ }
+ finally
+ {
+ FileUtils.closeQuietly(file);
+ }
+ }
+
+ /**
+ * Increment the total row read count and read rate for this SSTable. This should not be incremented for range
+ * slice queries, row cache hits, or non-query reads, like compaction.
+ */
+ public void incrementReadCount()
+ {
+ if (readMeter != null)
+ readMeter.mark();
+ }
+
+ public static class SizeComparator implements Comparator<SSTableReader>
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
+ }
+ }
+
+ public static abstract class Factory
+ {
+ public abstract SSTableReader open(final Descriptor descriptor,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ Long maxDataAge,
+ StatsMetadata sstableMetadata,
+ OpenReason openReason);
+
+ }
+}
[2/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index c017530,0000000..32b5520
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@@ -1,202 -1,0 +1,215 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format;
+
+import com.google.common.collect.Sets;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This is the API all table writers must implement.
+ *
+ * TableWriter.create() is the primary way to create a writer for a particular format.
+ * The format information is part of the Descriptor.
+ */
+public abstract class SSTableWriter extends SSTable
+{
+ private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
+
++ public static enum FinishType
++ {
++ NORMAL(SSTableReader.OpenReason.NORMAL),
++ EARLY(SSTableReader.OpenReason.EARLY), // no renaming
++ FINISH_EARLY(SSTableReader.OpenReason.NORMAL); // tidy up an EARLY finish
++ public final SSTableReader.OpenReason openReason;
++
++ FinishType(SSTableReader.OpenReason openReason)
++ {
++ this.openReason = openReason;
++ }
++ }
++
+ protected final long repairedAt;
+ protected final long keyCount;
+ protected final MetadataCollector metadataCollector;
+ protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+
+ protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+ {
+ super(descriptor, components(metadata), metadata, partitioner);
+ this.keyCount = keyCount;
+ this.repairedAt = repairedAt;
+ this.metadataCollector = metadataCollector;
+ this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
+ }
+
+ public static SSTableWriter create(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+ {
+ Factory writerFactory = descriptor.getFormat().getWriterFactory();
+ return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+ }
+
+ public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt)
+ {
+ return create(descriptor, keyCount, repairedAt, 0);
+ }
+
+ public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel)
+ {
+ CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
+ MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
+
+ return create(descriptor, keyCount, repairedAt, metadata, DatabaseDescriptor.getPartitioner(), collector);
+ }
+
+ public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel)
+ {
+ return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel);
+ }
+
+ public static SSTableWriter create(String filename, long keyCount, long repairedAt)
+ {
+ return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0);
+ }
+
+ private static Set<Component> components(CFMetaData metadata)
+ {
+ Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA,
+ Component.PRIMARY_INDEX,
+ Component.STATS,
+ Component.SUMMARY,
+ Component.TOC,
+ Component.DIGEST));
+
+ if (metadata.getBloomFilterFpChance() < 1.0)
+ components.add(Component.FILTER);
+
+ if (metadata.compressionParameters().sstableCompressor != null)
+ {
+ components.add(Component.COMPRESSION_INFO);
+ }
+ else
+ {
+ // it would feel safer to actually add this component later in maybeWriteDigest(),
+ // but the components are unmodifiable after construction
+ components.add(Component.CRC);
+ }
+ return components;
+ }
+
+
+ public abstract void mark();
+
+
+ /**
+ * @param row
+ * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+ */
+ public abstract RowIndexEntry append(AbstractCompactedRow row);
+
+ public abstract void append(DecoratedKey decoratedKey, ColumnFamily cf);
+
+ public abstract long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException;
+
+ public abstract long getFilePointer();
+
+ public abstract long getOnDiskFilePointer();
+
+ public abstract void isolateReferences();
+
+ public abstract void resetAndTruncate();
+
+ public SSTableReader closeAndOpenReader()
+ {
+ return closeAndOpenReader(System.currentTimeMillis());
+ }
+
+ public SSTableReader closeAndOpenReader(long maxDataAge)
+ {
- return closeAndOpenReader(maxDataAge, repairedAt);
++ return finish(FinishType.NORMAL, maxDataAge, repairedAt);
+ }
+
- public abstract SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt);
++ public abstract SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt);
+
+ public abstract SSTableReader openEarly(long maxDataAge);
+
+ // Close the writer and return the descriptor to the new sstable and it's metadata
+ public abstract Pair<Descriptor, StatsMetadata> close();
+
+
+
+ public static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
+ {
+ Descriptor newdesc = tmpdesc.asType(Descriptor.Type.FINAL);
+ rename(tmpdesc, newdesc, components);
+ return newdesc;
+ }
+
+ public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
+ {
+ for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
+ {
+ FileUtils.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
+ }
+
+ // do -Data last because -Data present should mean the sstable was completely renamed before crash
+ FileUtils.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
+
+ // rename it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
+ FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
+ }
+
+
+ /**
+ * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
+ */
+ public void abort()
+ {
+ abort(true);
+ }
+
+ public abstract void abort(boolean closeBf);
+
+
+ public static abstract class Factory
+ {
+ public abstract SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 7c68c8a,0000000..c52184b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,557 -1,0 +1,588 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.StreamingHistogram;
+
+public class BigTableWriter extends SSTableWriter
+{
+ private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
+
+ // not very random, but the only value that can't be mistaken for a legal column-name length
+ public static final int END_OF_ROW = 0x0000;
+
+ private IndexWriter iwriter;
+ private SegmentedFile.Builder dbuilder;
+ private final SequentialWriter dataFile;
+ private DecoratedKey lastWrittenKey;
+ private FileMark dataMark;
+
+ BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+ {
+ super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+
+ iwriter = new IndexWriter(keyCount);
+
+ if (compression)
+ {
+ dataFile = SequentialWriter.open(getFilename(),
+ descriptor.filenameFor(Component.COMPRESSION_INFO),
+ metadata.compressionParameters(),
+ metadataCollector);
+ dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
+ }
+ else
+ {
+ dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
+ dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ }
+ }
+
+ public void mark()
+ {
+ dataMark = dataFile.mark();
+ iwriter.mark();
+ }
+
+ public void resetAndTruncate()
+ {
+ dataFile.resetAndTruncate(dataMark);
+ iwriter.resetAndTruncate();
+ }
+
+ /**
+ * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
+ */
+ private long beforeAppend(DecoratedKey decoratedKey)
+ {
+ assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values
+ if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
+ throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
+ return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
+ }
+
+ private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
+ {
+ metadataCollector.addKey(decoratedKey.getKey());
+ lastWrittenKey = decoratedKey;
+ last = lastWrittenKey;
+ if (first == null)
+ first = lastWrittenKey;
+
+ if (logger.isTraceEnabled())
+ logger.trace("wrote {} at {}", decoratedKey, dataPosition);
+ iwriter.append(decoratedKey, index);
+ dbuilder.addPotentialBoundary(dataPosition);
+ }
+
+ /**
+ * @param row
+ * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+ */
+ public RowIndexEntry append(AbstractCompactedRow row)
+ {
+ long currentPosition = beforeAppend(row.key);
+ RowIndexEntry entry;
+ try
+ {
+ entry = row.write(currentPosition, dataFile);
+ if (entry == null)
+ return null;
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+ metadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
+ afterAppend(row.key, currentPosition, entry);
+ return entry;
+ }
+
+ public void append(DecoratedKey decoratedKey, ColumnFamily cf)
+ {
+ if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+ {
+ logger.error("Key size {} exceeds maximum of {}, skipping row",
+ decoratedKey.getKey().remaining(),
+ FBUtilities.MAX_UNSIGNED_SHORT);
+ return;
+ }
+
+ long startPosition = beforeAppend(decoratedKey);
+ try
+ {
+ RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
+ afterAppend(decoratedKey, startPosition, entry);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+ metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
+ }
+
+ private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
+ {
+ assert cf.hasColumns() || cf.isMarkedForDelete();
+
+ ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
+ ColumnIndex index = builder.build(cf);
+
+ out.writeShort(END_OF_ROW);
+ return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
+ }
+
+ /**
+ * @throws IOException if a read from the DataInput fails
+ * @throws FSWriteError if a write to the dataFile fails
+ */
+ public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
+ {
+ long currentPosition = beforeAppend(key);
+
+ ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
+ ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
+ ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
+ List<ByteBuffer> minColumnNames = Collections.emptyList();
+ List<ByteBuffer> maxColumnNames = Collections.emptyList();
+ StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
+ boolean hasLegacyCounterShards = false;
+
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+ cf.delete(DeletionTime.serializer.deserialize(in));
+
+ ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
+
+ if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
+ {
+ tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
+ maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
+ minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
+ maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
+ }
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
+ while (rangeTombstoneIterator.hasNext())
+ {
+ RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
+ tombstones.update(rangeTombstone.getLocalDeletionTime());
+ minTimestampTracker.update(rangeTombstone.timestamp());
+ maxTimestampTracker.update(rangeTombstone.timestamp());
+ maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
+ minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
+ maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
+ }
+
+ Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
+ try
+ {
+ while (iter.hasNext())
+ {
+ OnDiskAtom atom = iter.next();
+ if (atom == null)
+ break;
+
+ if (atom instanceof CounterCell)
+ {
+ atom = ((CounterCell) atom).markLocalToBeCleared();
+ hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
+ }
+
+ int deletionTime = atom.getLocalDeletionTime();
+ if (deletionTime < Integer.MAX_VALUE)
+ tombstones.update(deletionTime);
+ minTimestampTracker.update(atom.timestamp());
+ maxTimestampTracker.update(atom.timestamp());
+ minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
+ maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
+ maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
+
+ columnIndexer.add(atom); // This write the atom on disk too
+ }
+
+ columnIndexer.maybeWriteEmptyRowHeader();
+ dataFile.stream.writeShort(END_OF_ROW);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+
+ metadataCollector.updateMinTimestamp(minTimestampTracker.get())
+ .updateMaxTimestamp(maxTimestampTracker.get())
+ .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
+ .addRowSize(dataFile.getFilePointer() - currentPosition)
+ .addColumnCount(columnIndexer.writtenAtomCount())
+ .mergeTombstoneHistogram(tombstones)
+ .updateMinColumnNames(minColumnNames)
+ .updateMaxColumnNames(maxColumnNames)
+ .updateHasLegacyCounterShards(hasLegacyCounterShards);
+
+ afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
+ return currentPosition;
+ }
+
+ /**
+ * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
+ */
+ public void abort(boolean closeBf)
+ {
+ assert descriptor.type.isTemporary;
+ if (iwriter == null && dataFile == null)
+ return;
+ if (iwriter != null)
+ {
+ FileUtils.closeQuietly(iwriter.indexFile);
+ if (closeBf)
+ {
+ iwriter.bf.close();
+ }
+ }
+ if (dataFile!= null)
+ FileUtils.closeQuietly(dataFile);
+
+ Set<Component> components = SSTable.componentsFor(descriptor);
+ try
+ {
+ if (!components.isEmpty())
+ SSTable.delete(descriptor, components);
+ }
+ catch (FSWriteError e)
+ {
+ logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
+ throw e;
+ }
+ }
+
+ // we use this method to ensure any managed data we may have retained references to during the write are no
+ // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction
+ public void isolateReferences()
+ {
+ // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other
+ // data retention is done through copying
+ first = getMinimalKey(first);
+ last = lastWrittenKey = getMinimalKey(last);
+ }
+
++ private Descriptor makeTmpLinks()
++ {
++ // create temp links if they don't already exist
++ Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
++ if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
++ {
++ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
++ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
++ }
++ return link;
++ }
++
+ public SSTableReader openEarly(long maxDataAge)
+ {
+ StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance(),
+ repairedAt).get(MetadataType.STATS);
+
+ // find the max (exclusive) readable key
+ DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
+ if (exclusiveUpperBoundOfReadableIndex == null)
+ return null;
+
- // create temp links if they don't already exist
- Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
- if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
- {
- FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
- FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
- }
-
++ Descriptor link = makeTmpLinks();
+ // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
- SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
- SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
++ SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
++ SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
+ SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
+ components, metadata,
+ partitioner, ifile,
+ dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
+ iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
+
+ // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
+ sstable.first = getMinimalKey(first);
+ sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
+ DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
+ if (inclusiveUpperBoundOfReadableData == null)
+ {
+ // Prevent leaving tmplink files on disk
+ sstable.releaseReference();
+ return null;
+ }
+ int offset = 2;
+ while (true)
+ {
+ RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
+ if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
+ break;
+ inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
+ if (inclusiveUpperBoundOfReadableData == null)
+ {
+ sstable.releaseReference();
+ return null;
+ }
+ }
+ sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
+ return sstable;
+ }
+
- public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt)
++ public SSTableReader closeAndOpenReader()
+ {
- Pair<Descriptor, StatsMetadata> p = close(repairedAt);
- Descriptor newdesc = p.left;
- StatsMetadata sstableMetadata = p.right;
++ return closeAndOpenReader(System.currentTimeMillis());
++ }
++
++ public SSTableReader closeAndOpenReader(long maxDataAge)
++ {
++ return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
++ }
++
++ public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
++ {
++ Pair<Descriptor, StatsMetadata> p;
++
++ p = close(finishType, repairedAt);
++ Descriptor desc = p.left;
++ StatsMetadata metadata = p.right;
++
++ if (finishType == FinishType.EARLY)
++ desc = makeTmpLinks();
+
+ // finalize in-memory state for the reader
- SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
- SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA));
- SSTableReader sstable = SSTableReader.internalOpen(newdesc,
++ SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
++ SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
++ SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
+ components,
- metadata,
++ this.metadata,
+ partitioner,
+ ifile,
+ dfile,
+ iwriter.summary.build(partitioner),
+ iwriter.bf,
+ maxDataAge,
- sstableMetadata,
- SSTableReader.OpenReason.NORMAL);
++ metadata,
++ finishType.openReason);
+ sstable.first = getMinimalKey(first);
+ sstable.last = getMinimalKey(last);
- // try to save the summaries to disk
- sstable.saveSummary(iwriter.builder, dbuilder);
- iwriter = null;
- dbuilder = null;
++
++ switch (finishType)
++ {
++ case NORMAL: case FINISH_EARLY:
++ // try to save the summaries to disk
++ sstable.saveSummary(iwriter.builder, dbuilder);
++ iwriter = null;
++ dbuilder = null;
++ }
+ return sstable;
+ }
+
+ // Close the writer and return the descriptor to the new sstable and it's metadata
+ public Pair<Descriptor, StatsMetadata> close()
+ {
- return close(this.repairedAt);
++ return close(FinishType.NORMAL, this.repairedAt);
+ }
+
- private Pair<Descriptor, StatsMetadata> close(long repairedAt)
++ private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
+ {
++ switch (type)
++ {
++ case EARLY: case NORMAL:
++ iwriter.close();
++ dataFile.close();
++ }
+
- // index and filter
- iwriter.close();
- // main data, close will truncate if necessary
- dataFile.close();
- dataFile.writeFullChecksum(descriptor);
+ // write sstable statistics
- Map<MetadataType, MetadataComponent> metadataComponents = metadataCollector.finalizeMetadata(
- partitioner.getClass().getCanonicalName(),
- metadata.getBloomFilterFpChance(),
- repairedAt);
- writeMetadata(descriptor, metadataComponents);
-
- // save the table of components
- SSTable.appendTOC(descriptor, components);
++ Map<MetadataType, MetadataComponent> metadataComponents;
++ metadataComponents = metadataCollector
++ .finalizeMetadata(partitioner.getClass().getCanonicalName(),
++ metadata.getBloomFilterFpChance(),repairedAt);
+
+ // remove the 'tmp' marker from all components
- return Pair.create(SSTableWriter.rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS));
++ Descriptor descriptor = this.descriptor;
++ switch (type)
++ {
++ case NORMAL: case FINISH_EARLY:
++ dataFile.writeFullChecksum(descriptor);
++ writeMetadata(descriptor, metadataComponents);
++ // save the table of components
++ SSTable.appendTOC(descriptor, components);
++ descriptor = rename(descriptor, components);
++ }
+
++ return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
+ }
+
-
+ private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
+ {
+ SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
+ try
+ {
+ desc.getMetadataSerializer().serialize(components, out.stream);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, out.getPath());
+ }
+ finally
+ {
+ out.close();
+ }
+ }
+
+ public long getFilePointer()
+ {
+ return dataFile.getFilePointer();
+ }
+
+ public long getOnDiskFilePointer()
+ {
+ return dataFile.getOnDiskFilePointer();
+ }
+
+ /**
+ * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
+ */
+ class IndexWriter implements Closeable
+ {
+ private final SequentialWriter indexFile;
+ public final SegmentedFile.Builder builder;
+ public final IndexSummaryBuilder summary;
+ public final IFilter bf;
+ private FileMark mark;
+
+ IndexWriter(long keyCount)
+ {
+ indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+ builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
+ bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
+ }
+
+ // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
+ DecoratedKey getMaxReadableKey(int offset)
+ {
+ long maxIndexLength = indexFile.getLastFlushOffset();
+ return summary.getMaxReadableKey(maxIndexLength, offset);
+ }
+
+ public void append(DecoratedKey key, RowIndexEntry indexEntry)
+ {
+ bf.add(key.getKey());
+ long indexPosition = indexFile.getFilePointer();
+ try
+ {
+ ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
+ rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, indexFile.getPath());
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition);
+
+ summary.maybeAddEntry(key, indexPosition);
+ builder.addPotentialBoundary(indexPosition);
+ }
+
+ /**
+ * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
+ */
+ public void close()
+ {
+ if (components.contains(Component.FILTER))
+ {
+ String path = descriptor.filenameFor(Component.FILTER);
+ try
+ {
+ // bloom filter
+ FileOutputStream fos = new FileOutputStream(path);
+ DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos);
+ FilterFactory.serialize(bf, stream);
+ stream.flush();
+ fos.getFD().sync();
+ stream.close();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, path);
+ }
+ }
+
+ // index
+ long position = indexFile.getFilePointer();
+ indexFile.close(); // calls force
+ FileUtils.truncate(indexFile.getPath(), position);
+ }
+
+ public void mark()
+ {
+ mark = indexFile.mark();
+ }
+
+ public void resetAndTruncate()
+ {
+ // we can't un-set the bloom filter addition, but extra keys in there are harmless.
+ // we can't reset dbuilder either, but that is the last thing called in afterappend so
+ // we assume that if that worked then we won't be trying to reset.
+ indexFile.resetAndTruncate(mark);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index b284f61,57f465f..7679e30
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@@ -19,6 -19,8 +19,8 @@@ package org.apache.cassandra.io.util
import java.io.File;
-import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
+
public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
{
public BufferedPoolingSegmentedFile(String path, long length)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index aa031e3,2f715da..325dc27
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@@ -19,6 -19,8 +19,8 @@@ package org.apache.cassandra.io.util
import java.io.File;
-import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
+
public class BufferedSegmentedFile extends SegmentedFile
{
public BufferedSegmentedFile(String path, long length)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 1803e69,11d091a..489a2b5
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@@ -20,6 -20,7 +20,7 @@@ package org.apache.cassandra.io.util
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 4afe0a0,b788715..e0f50c2
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@@ -20,6 -20,7 +20,7 @@@ package org.apache.cassandra.io.util
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index ccc03fc,3b2cc98..bf120a3
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@@ -28,6 -28,7 +28,7 @@@ import org.slf4j.Logger
import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
public class MmappedSegmentedFile extends SegmentedFile
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SegmentedFile.java
index be549a6,badae56..d3b5bf2
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@@ -29,6 -29,7 +29,7 @@@ import org.apache.cassandra.config.Conf
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.Pair;
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index d5e92dd,392936d..a4ab04f
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -32,12 -30,11 +32,8 @@@ import org.junit.Test
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
-import org.apache.cassandra.db.ArrayBackedSortedColumns;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.config.KSMetaData;
- import org.apache.cassandra.db.ArrayBackedSortedColumns;
- import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.DecoratedKey;
- import org.apache.cassandra.db.Keyspace;
- import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.CompactionController;
@@@ -641,6 -614,73 +635,73 @@@ public class SSTableRewriterTest extend
}
+ @Test
+ public void testAllKeysReadable() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ for (int i = 0; i < 100; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i));
+ Mutation rm = new Mutation(KEYSPACE, key.getKey());
+ for (int j = 0; j < 10; j++)
+ rm.add(CF, Util.cellname(Integer.toString(j)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ cfs.forceMajorCompaction();
+ validateKeys(keyspace);
+
+ assertEquals(1, cfs.getSSTables().size());
+ SSTableReader s = cfs.getSSTables().iterator().next();
+ Set<SSTableReader> compacting = new HashSet<>();
+ compacting.add(s);
+ cfs.getDataTracker().markCompacting(compacting);
+
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter.overrideOpenInterval(1);
+ SSTableWriter w = getWriter(cfs, s.descriptor.directory);
+ rewriter.switchWriter(w);
+ int keyCount = 0;
- try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
++ try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ while (scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (keyCount % 10 == 0)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ }
+ keyCount++;
+ validateKeys(keyspace);
+ }
+ try
+ {
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(), OperationType.COMPACTION);
+ cfs.getDataTracker().unmarkCompacting(compacting);
+ }
+ catch (Throwable t)
+ {
+ rewriter.abort();
+ }
+ }
+ validateKeys(keyspace);
+ Thread.sleep(1000);
+ validateCFS(cfs);
+ }
+
+ private void validateKeys(Keyspace ks)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i));
+ ColumnFamily cf = Util.getColumnFamily(ks, key, CF);
+ assertTrue(cf != null);
+ }
+ }
+
private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
{
ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);