You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/12 14:39:53 UTC
[2/3] cassandra git commit: Make SSTableWriter.openEarly more robust
and obvious
Make SSTableWriter.openEarly more robust and obvious
patch by benedict; reviewed by marcus for CASSANDRA-8747
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4eb9fa78
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4eb9fa78
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4eb9fa78
Branch: refs/heads/trunk
Commit: 4eb9fa78bd233f5f9b901dd677636842b351330b
Parents: f57ec8c
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Feb 12 13:35:23 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Feb 12 13:35:23 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/compress/CompressedSequentialWriter.java | 2 +
.../io/sstable/IndexSummaryBuilder.java | 162 ++++++++++++++-----
.../cassandra/io/sstable/SSTableWriter.java | 76 +++++----
.../cassandra/io/util/SequentialWriter.java | 10 ++
5 files changed, 168 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2466b19..cbb4334 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.4
+ * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
* Enforce SSTableReader.first/last (CASSANDRA-8744)
* Cleanup SegmentedFile API (CASSANDRA-8749)
* Avoid overlap with early compaction replacement (CASSANDRA-8683)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 81bb3e9..87eb2fb 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -138,6 +138,8 @@ public class CompressedSequentialWriter extends SequentialWriter
// next chunk should be written right after current + length of the checksum (int)
chunkOffset += compressedLength + 4;
+ if (runPostFlush != null)
+ runPostFlush.run();
}
public CompressionMetadata open(SSTableWriter.FinishType finishType)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index df326d7..3b93b31 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.io.sstable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +47,41 @@ public class IndexSummaryBuilder
private long keysWritten = 0;
private long indexIntervalMatches = 0;
private long offheapSize = 0;
+ private long nextSamplePosition;
+
+ // for each ReadableBoundary, we map its dataLength property to itself, permitting us to lookup the
+ // last readable boundary from the perspective of the data file
+ // [data file position limit] => [ReadableBoundary]
+ private TreeMap<Long, ReadableBoundary> lastReadableByData = new TreeMap<>();
+ // for each ReadableBoundary, we map its indexLength property to itself, permitting us to lookup the
+ // last readable boundary from the perspective of the index file
+ // [index file position limit] => [ReadableBoundary]
+ private TreeMap<Long, ReadableBoundary> lastReadableByIndex = new TreeMap<>();
+ // the last synced data file position
+ private long dataSyncPosition;
+ // the last synced index file position
+ private long indexSyncPosition;
+
+ // the last summary interval boundary that is fully readable in both data and index files
+ private ReadableBoundary lastReadableBoundary;
+
+ /**
+ * Represents a boundary that is guaranteed fully readable in the summary, index file and data file.
+ * The key contained is the last key readable if the index and data files have been flushed to the
+ * stored lengths.
+ */
+ public static class ReadableBoundary
+ {
+ final DecoratedKey lastKey;
+ final long indexLength;
+ final long dataLength;
+ public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength)
+ {
+ this.lastKey = lastKey;
+ this.indexLength = indexLength;
+ this.dataLength = dataLength;
+ }
+ }
public IndexSummaryBuilder(long expectedKeys, int minIndexInterval, int samplingLevel)
{
@@ -71,74 +108,113 @@ public class IndexSummaryBuilder
maxExpectedEntries = (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL;
positions = new ArrayList<>((int)maxExpectedEntries);
keys = new ArrayList<>((int)maxExpectedEntries);
+ // if we're downsampling we may not use index 0
+ setNextSamplePosition(-minIndexInterval);
}
- // finds the last (-offset) decorated key that can be guaranteed to occur fully in the index file before the provided file position
- public DecoratedKey getMaxReadableKey(long position, int offset)
+ // the index file has been flushed to the provided position; stash it and use that to recalculate our max readable boundary
+ public void markIndexSynced(long upToPosition)
{
- int i = Collections.binarySearch(positions, position);
- if (i < 0)
- {
- i = -1 - i;
- if (i == positions.size())
- i -= 2;
- else
- i -= 1;
- }
- else
- i -= 1;
- i -= offset;
- // we don't want to return any key if there's only 1 item in the summary, to make sure the sstable range is non-empty
- if (i <= 0)
- return null;
- return keys.get(i);
+ indexSyncPosition = upToPosition;
+ refreshReadableBoundary();
}
- public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexPosition)
+ // the data file has been flushed to the provided position; stash it and use that to recalculate our max readable boundary
+ public void markDataSynced(long upToPosition)
{
- if (keysWritten % minIndexInterval == 0)
- {
- // see if we should skip this key based on our sampling level
- boolean shouldSkip = false;
- for (int start : startPoints)
- {
- if ((indexIntervalMatches - start) % BASE_SAMPLING_LEVEL == 0)
- {
- shouldSkip = true;
- break;
- }
- }
+ dataSyncPosition = upToPosition;
+ refreshReadableBoundary();
+ }
- if (!shouldSkip)
- {
- keys.add(getMinimalKey(decoratedKey));
- offheapSize += decoratedKey.getKey().remaining();
- positions.add(indexPosition);
- offheapSize += TypeSizes.NATIVE.sizeof(indexPosition);
- }
+ private void refreshReadableBoundary()
+ {
+ // grab the readable boundary prior to the given position in either the data or index file
+ Map.Entry<?, ReadableBoundary> byData = lastReadableByData.floorEntry(dataSyncPosition);
+ Map.Entry<?, ReadableBoundary> byIndex = lastReadableByIndex.floorEntry(indexSyncPosition);
+ if (byData == null || byIndex == null)
+ return;
+
+ // take the lowest of the two, and stash it
+ lastReadableBoundary = byIndex.getValue().indexLength < byData.getValue().indexLength
+ ? byIndex.getValue() : byData.getValue();
+
+ // clear our data prior to this, since we no longer need it
+ lastReadableByData.headMap(lastReadableBoundary.dataLength, false).clear();
+ lastReadableByIndex.headMap(lastReadableBoundary.indexLength, false).clear();
+ }
- indexIntervalMatches++;
+ public ReadableBoundary getLastReadableBoundary()
+ {
+ return lastReadableBoundary;
+ }
+
+ public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart)
+ {
+ return maybeAddEntry(decoratedKey, indexStart, 0, 0);
+ }
+
+ /**
+ *
+ * @param decoratedKey the key for this record
+ * @param indexStart the position in the index file this record begins
+ * @param indexEnd the position in the index file we need to be able to read to (exclusive) to read this record
+ * @param dataEnd the position in the data file we need to be able to read to (exclusive) to read this record
+ * a value of 0 indicates we are not tracking readable boundaries
+ */
+ public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd)
+ {
+ if (keysWritten == nextSamplePosition)
+ {
+ keys.add(getMinimalKey(decoratedKey));
+ offheapSize += decoratedKey.getKey().remaining();
+ positions.add(indexStart);
+ offheapSize += TypeSizes.NATIVE.sizeof(indexStart);
+ setNextSamplePosition(keysWritten);
+ }
+ else if (dataEnd != 0 && keysWritten + 1 == nextSamplePosition)
+ {
+ // this is the last key in this summary interval, so stash it
+ ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd);
+ lastReadableByData.put(dataEnd, boundary);
+ lastReadableByIndex.put(indexEnd, boundary);
}
keysWritten++;
return this;
}
+ // calculate the next key we will store to our summary
+ private void setNextSamplePosition(long position)
+ {
+ tryAgain: while (true)
+ {
+ position += minIndexInterval;
+ long test = indexIntervalMatches++;
+ for (int start : startPoints)
+ if ((test - start) % BASE_SAMPLING_LEVEL == 0)
+ continue tryAgain;
+
+ nextSamplePosition = position;
+ return;
+ }
+ }
+
public IndexSummary build(IPartitioner partitioner)
{
return build(partitioner, null);
}
- public IndexSummary build(IPartitioner partitioner, DecoratedKey exclusiveUpperBound)
+ // lastIntervalKey should come from getLastReadableBoundary().lastKey
+ public IndexSummary build(IPartitioner partitioner, DecoratedKey lastIntervalKey)
{
assert keys.size() > 0;
assert keys.size() == positions.size();
int length;
- if (exclusiveUpperBound == null)
+ if (lastIntervalKey == null)
length = keys.size();
- else
- length = Collections.binarySearch(keys, exclusiveUpperBound);
+ else // since it's an inclusive upper bound, this should never match exactly
+ length = -1 -Collections.binarySearch(keys, lastIntervalKey);
assert length > 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index d430314..2c1cf0e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -131,7 +131,6 @@ public class SSTableWriter extends SSTable
metadata,
partitioner);
this.repairedAt = repairedAt;
- iwriter = new IndexWriter(keyCount);
if (compression)
{
@@ -146,6 +145,7 @@ public class SSTableWriter extends SSTable
dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
}
+ iwriter = new IndexWriter(keyCount, dataFile);
this.sstableMetadataCollector = sstableMetadataCollector;
}
@@ -183,7 +183,7 @@ public class SSTableWriter extends SSTable
if (logger.isTraceEnabled())
logger.trace("wrote " + decoratedKey + " at " + dataPosition);
- iwriter.append(decoratedKey, index);
+ iwriter.append(decoratedKey, index, dataPosition);
dbuilder.addPotentialBoundary(dataPosition);
}
@@ -193,11 +193,11 @@ public class SSTableWriter extends SSTable
*/
public RowIndexEntry append(AbstractCompactedRow row)
{
- long currentPosition = beforeAppend(row.key);
+ long startPosition = beforeAppend(row.key);
RowIndexEntry entry;
try
{
- entry = row.write(currentPosition, dataFile.stream);
+ entry = row.write(startPosition, dataFile.stream);
if (entry == null)
return null;
}
@@ -205,8 +205,9 @@ public class SSTableWriter extends SSTable
{
throw new FSWriteError(e, dataFile.getPath());
}
- sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
- afterAppend(row.key, currentPosition, entry);
+ long endPosition = dataFile.getFilePointer();
+ sstableMetadataCollector.update(endPosition - startPosition, row.columnStats());
+ afterAppend(row.key, endPosition, entry);
return entry;
}
@@ -390,10 +391,11 @@ public class SSTableWriter extends SSTable
repairedAt).get(MetadataType.STATS);
// find the max (exclusive) readable key
- DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
- if (exclusiveUpperBoundOfReadableIndex == null)
+ IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable();
+ if (boundary == null)
return null;
+ assert boundary.indexLength > 0 && boundary.dataLength > 0;
Descriptor link = makeTmpLinks();
// open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
@@ -401,33 +403,12 @@ public class SSTableWriter extends SSTable
SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
components, metadata,
partitioner, ifile,
- dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
+ dfile, iwriter.summary.build(partitioner, boundary.lastKey),
iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
// now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
sstable.first = getMinimalKey(first);
- sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
- DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
- if (inclusiveUpperBoundOfReadableData == null)
- {
- // Prevent leaving tmplink files on disk
- sstable.selfRef().release();
- return null;
- }
- int offset = 2;
- while (true)
- {
- RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
- if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
- break;
- inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
- if (inclusiveUpperBoundOfReadableData == null)
- {
- sstable.selfRef().release();
- return null;
- }
- }
- sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
+ sstable.last = getMinimalKey(boundary.lastKey);
return sstable;
}
@@ -593,25 +574,39 @@ public class SSTableWriter extends SSTable
public final IFilter bf;
private FileMark mark;
- IndexWriter(long keyCount)
+ IndexWriter(long keyCount, final SequentialWriter dataFile)
{
indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
+ // register listeners to be alerted when the data files are flushed
+ indexFile.setPostFlushListener(new Runnable()
+ {
+ public void run()
+ {
+ summary.markIndexSynced(indexFile.getLastFlushOffset());
+ }
+ });
+ dataFile.setPostFlushListener(new Runnable()
+ {
+ public void run()
+ {
+ summary.markDataSynced(dataFile.getLastFlushOffset());
+ }
+ });
}
// finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
- DecoratedKey getMaxReadableKey(int offset)
+ IndexSummaryBuilder.ReadableBoundary getMaxReadable()
{
- long maxIndexLength = indexFile.getLastFlushOffset();
- return summary.getMaxReadableKey(maxIndexLength, offset);
+ return summary.getLastReadableBoundary();
}
- public void append(DecoratedKey key, RowIndexEntry indexEntry)
+ public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd)
{
bf.add(key.getKey());
- long indexPosition = indexFile.getFilePointer();
+ long indexStart = indexFile.getFilePointer();
try
{
ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
@@ -621,12 +616,13 @@ public class SSTableWriter extends SSTable
{
throw new FSWriteError(e, indexFile.getPath());
}
+ long indexEnd = indexFile.getFilePointer();
if (logger.isTraceEnabled())
- logger.trace("wrote index entry: " + indexEntry + " at " + indexPosition);
+ logger.trace("wrote index entry: " + indexEntry + " at " + indexStart);
- summary.maybeAddEntry(key, indexPosition);
- builder.addPotentialBoundary(indexPosition);
+ summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
+ builder.addPotentialBoundary(indexStart);
}
public void abort()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 227c79d..40f3e9d 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -69,6 +69,8 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
public final DataOutputPlus stream;
protected long lastFlushOffset;
+ protected Runnable runPostFlush;
+
public SequentialWriter(File file, int bufferSize)
{
try
@@ -304,6 +306,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
}
}
+ public void setPostFlushListener(Runnable runPostFlush)
+ {
+ assert this.runPostFlush == null;
+ this.runPostFlush = runPostFlush;
+ }
+
/**
* Override this method instead of overriding flush()
* @throws FSWriteError on any I/O error.
@@ -319,6 +327,8 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
throw new FSWriteError(e, getPath());
}
+ if (runPostFlush != null)
+ runPostFlush.run();
}
public long getFilePointer()