You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/04 16:18:14 UTC
[2/4] cassandra git commit: IndexSummaryBuilder utilises offheap
memory, and shares data between each IndexSummary opened from it
IndexSummaryBuilder utilises offheap memory, and shares data between
each IndexSummary opened from it
patch by benedict; reviewed by ariel for CASSANDRA-8757
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3c0e11e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3c0e11e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3c0e11e
Branch: refs/heads/trunk
Commit: f3c0e11e2ddb0b0666e7723a3fca005707b778ea
Parents: 3c3fefa
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Mar 4 15:07:32 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 4 15:07:32 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cassandra/io/sstable/IndexSummary.java | 113 ++++++++++-----
.../io/sstable/IndexSummaryBuilder.java | 144 +++++++++----------
.../cassandra/io/sstable/SSTableReader.java | 69 ++++-----
.../cassandra/io/sstable/SSTableWriter.java | 4 +-
.../cassandra/io/util/AbstractDataOutput.java | 4 +-
.../cassandra/io/util/DataOutputPlus.java | 3 +-
.../org/apache/cassandra/io/util/Memory.java | 40 +++++-
.../cassandra/io/util/SafeMemoryWriter.java | 136 ++++++++++++++++++
.../apache/cassandra/utils/concurrent/Ref.java | 2 +-
.../concurrent/WrappedSharedCloseable.java | 14 +-
.../cassandra/io/sstable/IndexSummaryTest.java | 54 +++----
.../cassandra/io/util/DataOutputTest.java | 11 ++
13 files changed, 411 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6133536..3b373ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@
* cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
(CASSANDRA-8154)
* Show progress of streaming in nodetool netstats (CASSANDRA-8886)
+ * IndexSummaryBuilder utilises offheap memory, and shares data between
+ each IndexSummary opened from it (CASSANDRA-8757)
Merged from 2.0:
* Add offline tool to relevel sstables (CASSANDRA-8301)
* Preserve stream ID for more protocol errors (CASSANDRA-8848)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 0cde124..bad50b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.io.sstable;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
-import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.IPartitioner;
@@ -54,9 +54,16 @@ public class IndexSummary extends WrappedSharedCloseable
private final int minIndexInterval;
private final IPartitioner partitioner;
- private final int summarySize;
private final int sizeAtFullSampling;
- private final Memory bytes;
+ // we permit the memory to span a range larger than we use,
+ // so we have an accompanying count and length for each part
+ // we split our data into two ranges: offsets (indexing into entries),
+ // and entries containing the summary data
+ private final Memory offsets;
+ private final int offsetCount;
+ // entries is a list of (partition key, index file offset) pairs
+ private final Memory entries;
+ private final long entriesLength;
/**
* A value between 1 and BASE_SAMPLING_LEVEL that represents how many of the original
@@ -66,15 +73,18 @@ public class IndexSummary extends WrappedSharedCloseable
*/
private final int samplingLevel;
- public IndexSummary(IPartitioner partitioner, Memory bytes, int summarySize, int sizeAtFullSampling,
- int minIndexInterval, int samplingLevel)
+ public IndexSummary(IPartitioner partitioner, Memory offsets, int offsetCount, Memory entries, long entriesLength,
+ int sizeAtFullSampling, int minIndexInterval, int samplingLevel)
{
- super(bytes);
+ super(new Memory[] { offsets, entries });
+ assert offsets.getInt(0) == 0;
this.partitioner = partitioner;
this.minIndexInterval = minIndexInterval;
- this.summarySize = summarySize;
+ this.offsetCount = offsetCount;
+ this.entriesLength = entriesLength;
this.sizeAtFullSampling = sizeAtFullSampling;
- this.bytes = bytes;
+ this.offsets = offsets;
+ this.entries = entries;
this.samplingLevel = samplingLevel;
}
@@ -83,9 +93,11 @@ public class IndexSummary extends WrappedSharedCloseable
super(copy);
this.partitioner = copy.partitioner;
this.minIndexInterval = copy.minIndexInterval;
- this.summarySize = copy.summarySize;
+ this.offsetCount = copy.offsetCount;
+ this.entriesLength = copy.entriesLength;
this.sizeAtFullSampling = copy.sizeAtFullSampling;
- this.bytes = copy.bytes;
+ this.offsets = copy.offsets;
+ this.entries = copy.entries;
this.samplingLevel = copy.samplingLevel;
}
@@ -93,7 +105,7 @@ public class IndexSummary extends WrappedSharedCloseable
// Harmony's Collections implementation
public int binarySearch(RowPosition key)
{
- int low = 0, mid = summarySize, high = mid - 1, result = -1;
+ int low = 0, mid = offsetCount, high = mid - 1, result = -1;
while (low <= high)
{
mid = (low + high) >> 1;
@@ -123,7 +135,7 @@ public class IndexSummary extends WrappedSharedCloseable
public int getPositionInSummary(int index)
{
// The first section of bytes holds a four-byte position for each entry in the summary, so just multiply by 4.
- return bytes.getInt(index << 2);
+ return offsets.getInt(index << 2);
}
public byte[] getKey(int index)
@@ -131,27 +143,23 @@ public class IndexSummary extends WrappedSharedCloseable
long start = getPositionInSummary(index);
int keySize = (int) (calculateEnd(index) - start - 8L);
byte[] key = new byte[keySize];
- bytes.getBytes(start, key, 0, keySize);
+ entries.getBytes(start, key, 0, keySize);
return key;
}
public long getPosition(int index)
{
- return bytes.getLong(calculateEnd(index) - 8);
+ return entries.getLong(calculateEnd(index) - 8);
}
- public byte[] getEntry(int index)
+ public long getEndInSummary(int index)
{
- long start = getPositionInSummary(index);
- long end = calculateEnd(index);
- byte[] entry = new byte[(int)(end - start)];
- bytes.getBytes(start, entry, 0, (int) (end - start));
- return entry;
+ return calculateEnd(index);
}
private long calculateEnd(int index)
{
- return index == (summarySize - 1) ? bytes.size() : getPositionInSummary(index + 1);
+ return index == (offsetCount - 1) ? entriesLength : getPositionInSummary(index + 1);
}
public int getMinIndexInterval()
@@ -174,7 +182,7 @@ public class IndexSummary extends WrappedSharedCloseable
public int size()
{
- return summarySize;
+ return offsetCount;
}
public int getSamplingLevel()
@@ -192,12 +200,27 @@ public class IndexSummary extends WrappedSharedCloseable
}
/**
- * Returns the amount of off-heap memory used for this summary.
+ * Returns the amount of off-heap memory used for the entries portion of this summary.
* @return size in bytes
*/
- public long getOffHeapSize()
+ long getEntriesLength()
+ {
+ return entriesLength;
+ }
+
+ Memory getOffsets()
+ {
+ return offsets;
+ }
+
+ Memory getEntries()
+ {
+ return entries;
+ }
+
+ long getOffHeapSize()
{
- return bytes.size();
+ return offsetCount * 4 + entriesLength;
}
/**
@@ -224,14 +247,29 @@ public class IndexSummary extends WrappedSharedCloseable
public void serialize(IndexSummary t, DataOutputPlus out, boolean withSamplingLevel) throws IOException
{
out.writeInt(t.minIndexInterval);
- out.writeInt(t.summarySize);
- out.writeLong(t.bytes.size());
+ out.writeInt(t.offsetCount);
+ out.writeLong(t.getOffHeapSize());
if (withSamplingLevel)
{
out.writeInt(t.samplingLevel);
out.writeInt(t.sizeAtFullSampling);
}
- out.write(t.bytes);
+ // our on-disk representation treats the offsets and the summary data as one contiguous structure,
+ // in which the offsets are based from the start of the structure. i.e., if the offsets occupy
+ // X bytes, the value of the first offset will be X. In memory we split the two regions up, so that
+ // the summary values are indexed from zero, so we apply a correction to the offsets when de/serializing.
+ // In this case adding X to each of the offsets.
+ int baseOffset = t.offsetCount * 4;
+ for (int i = 0 ; i < t.offsetCount ; i++)
+ {
+ int offset = t.offsets.getInt(i * 4) + baseOffset;
+ // our serialization format for this file uses native byte order, so if this is different to the
+ // default Java serialization order (BIG_ENDIAN) we have to reverse our bytes
+ if (ByteOrder.nativeOrder() != ByteOrder.BIG_ENDIAN)
+ offset = Integer.reverseBytes(offset);
+ out.writeInt(offset);
+ }
+ out.write(t.entries, 0, t.entriesLength);
}
public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedMinIndexInterval, int maxIndexInterval) throws IOException
@@ -243,7 +281,7 @@ public class IndexSummary extends WrappedSharedCloseable
minIndexInterval, expectedMinIndexInterval));
}
- int summarySize = in.readInt();
+ int offsetCount = in.readInt();
long offheapSize = in.readLong();
int samplingLevel, fullSamplingSummarySize;
if (haveSamplingLevel)
@@ -254,7 +292,7 @@ public class IndexSummary extends WrappedSharedCloseable
else
{
samplingLevel = BASE_SAMPLING_LEVEL;
- fullSamplingSummarySize = summarySize;
+ fullSamplingSummarySize = offsetCount;
}
int effectiveIndexInterval = (int) Math.ceil((BASE_SAMPLING_LEVEL / (double) samplingLevel) * minIndexInterval);
@@ -264,9 +302,18 @@ public class IndexSummary extends WrappedSharedCloseable
" the current max index interval (%d)", effectiveIndexInterval, maxIndexInterval));
}
- RefCountedMemory memory = new RefCountedMemory(offheapSize);
- FBUtilities.copy(in, new MemoryOutputStream(memory), offheapSize);
- return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, minIndexInterval, samplingLevel);
+ Memory offsets = Memory.allocate(offsetCount * 4);
+ Memory entries = Memory.allocate(offheapSize - offsets.size());
+ FBUtilities.copy(in, new MemoryOutputStream(offsets), offsets.size());
+ FBUtilities.copy(in, new MemoryOutputStream(entries), entries.size());
+ // our on-disk representation treats the offsets and the summary data as one contiguous structure,
+ // in which the offsets are based from the start of the structure. i.e., if the offsets occupy
+ // X bytes, the value of the first offset will be X. In memory we split the two regions up, so that
+ // the summary values are indexed from zero, so we apply a correction to the offsets when de/serializing.
+ // In this case subtracting X from each of the offsets.
+ for (int i = 0 ; i < offsets.size() ; i += 4)
+ offsets.setInt(i, (int) (offsets.getInt(i) - offsets.size()));
+ return new IndexSummary(partitioner, offsets, offsetCount, entries, entries.size(), fullSamplingSummarySize, minIndexInterval, samplingLevel);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 3b93b31..54e8dd2 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,36 +17,33 @@
*/
package org.apache.cassandra.io.sstable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.nio.ByteOrder;
import java.util.Map;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.SafeMemoryWriter;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
-import static org.apache.cassandra.io.sstable.SSTable.getMinimalKey;
-public class IndexSummaryBuilder
+public class IndexSummaryBuilder implements AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class);
- private final ArrayList<Long> positions;
- private final ArrayList<DecoratedKey> keys;
+ // the offset in the keys memory region to look for a given summary boundary
+ private final SafeMemoryWriter offsets;
+ private final SafeMemoryWriter entries;
+
private final int minIndexInterval;
private final int samplingLevel;
private final int[] startPoints;
private long keysWritten = 0;
private long indexIntervalMatches = 0;
- private long offheapSize = 0;
private long nextSamplePosition;
// for each ReadableBoundary, we map its dataLength property to itself, permitting us to lookup the
@@ -75,11 +72,15 @@ public class IndexSummaryBuilder
final DecoratedKey lastKey;
final long indexLength;
final long dataLength;
- public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength)
+ final int summaryCount;
+ final long entriesLength;
+ public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength, int summaryCount, long entriesLength)
{
this.lastKey = lastKey;
this.indexLength = indexLength;
this.dataLength = dataLength;
+ this.summaryCount = summaryCount;
+ this.entriesLength = entriesLength;
}
}
@@ -105,10 +106,9 @@ public class IndexSummaryBuilder
}
// for initializing data structures, adjust our estimates based on the sampling level
- maxExpectedEntries = (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL;
- positions = new ArrayList<>((int)maxExpectedEntries);
- keys = new ArrayList<>((int)maxExpectedEntries);
- // if we're downsampling we may not use index 0
+ maxExpectedEntries = Math.max(1, (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL);
+ offsets = new SafeMemoryWriter(4 * maxExpectedEntries).withByteOrder(ByteOrder.nativeOrder());
+ entries = new SafeMemoryWriter(40 * maxExpectedEntries).withByteOrder(ByteOrder.nativeOrder());
setNextSamplePosition(-minIndexInterval);
}
@@ -165,16 +165,16 @@ public class IndexSummaryBuilder
{
if (keysWritten == nextSamplePosition)
{
- keys.add(getMinimalKey(decoratedKey));
- offheapSize += decoratedKey.getKey().remaining();
- positions.add(indexStart);
- offheapSize += TypeSizes.NATIVE.sizeof(indexStart);
+ assert entries.length() <= Integer.MAX_VALUE;
+ offsets.writeInt((int) entries.length());
+ entries.write(decoratedKey.getKey());
+ entries.writeLong(indexStart);
setNextSamplePosition(keysWritten);
}
else if (dataEnd != 0 && keysWritten + 1 == nextSamplePosition)
{
// this is the last key in this summary interval, so stash it
- ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd);
+ ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd, (int)(offsets.length() / 4), entries.length());
lastReadableByData.put(dataEnd, boundary);
lastReadableByIndex.put(indexEnd, boundary);
}
@@ -201,52 +201,39 @@ public class IndexSummaryBuilder
public IndexSummary build(IPartitioner partitioner)
{
+ // this method should only be called when we've finished appending records, so we truncate the
+ // memory we're using to the exact amount required to represent it before building our summary
+ entries.setCapacity(entries.length());
+ offsets.setCapacity(offsets.length());
return build(partitioner, null);
}
- // lastIntervalKey should come from getLastReadableBoundary().lastKey
- public IndexSummary build(IPartitioner partitioner, DecoratedKey lastIntervalKey)
+ // build the summary up to the provided boundary; this is backed by shared memory between
+ // multiple invocations of this build method
+ public IndexSummary build(IPartitioner partitioner, ReadableBoundary boundary)
{
- assert keys.size() > 0;
- assert keys.size() == positions.size();
-
- int length;
- if (lastIntervalKey == null)
- length = keys.size();
- else // since it's an inclusive upper bound, this should never match exactly
- length = -1 -Collections.binarySearch(keys, lastIntervalKey);
-
- assert length > 0;
-
- long offheapSize = this.offheapSize;
- if (length < keys.size())
- for (int i = length ; i < keys.size() ; i++)
- offheapSize -= keys.get(i).getKey().remaining() + TypeSizes.NATIVE.sizeof(positions.get(i));
-
- // first we write out the position in the *summary* for each key in the summary,
- // then we write out (key, actual index position) pairs
- Memory memory = Memory.allocate(offheapSize + (length * 4));
- int idxPosition = 0;
- int keyPosition = length * 4;
- for (int i = 0; i < length; i++)
+ assert entries.length() > 0;
+
+ int count = (int) (offsets.length() / 4);
+ long entriesLength = entries.length();
+ if (boundary != null)
{
- // write the position of the actual entry in the index summary (4 bytes)
- memory.setInt(idxPosition, keyPosition);
- idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
-
- // write the key
- ByteBuffer keyBytes = keys.get(i).getKey();
- memory.setBytes(keyPosition, keyBytes);
- keyPosition += keyBytes.remaining();
-
- // write the position in the actual index file
- long actualIndexPosition = positions.get(i);
- memory.setLong(keyPosition, actualIndexPosition);
- keyPosition += TypeSizes.NATIVE.sizeof(actualIndexPosition);
+ count = boundary.summaryCount;
+ entriesLength = boundary.entriesLength;
}
- assert keyPosition == offheapSize + (length * 4);
+
int sizeAtFullSampling = (int) Math.ceil(keysWritten / (double) minIndexInterval);
- return new IndexSummary(partitioner, memory, length, sizeAtFullSampling, minIndexInterval, samplingLevel);
+ assert count > 0;
+ return new IndexSummary(partitioner, offsets.currentBuffer().sharedCopy(),
+ count, entries.currentBuffer().sharedCopy(), entriesLength,
+ sizeAtFullSampling, minIndexInterval, samplingLevel);
+ }
+
+ // close the builder and release any associated memory
+ public void close()
+ {
+ entries.close();
+ offsets.close();
}
public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
@@ -294,26 +281,25 @@ public class IndexSummaryBuilder
int[] startPoints = Downsampling.getStartPoints(currentSamplingLevel, newSamplingLevel);
// calculate new off-heap size
- int removedKeyCount = 0;
- long newOffHeapSize = existing.getOffHeapSize();
+ int newKeyCount = existing.size();
+ long newEntriesLength = existing.getEntriesLength();
for (int start : startPoints)
{
for (int j = start; j < existing.size(); j += currentSamplingLevel)
{
- removedKeyCount++;
- newOffHeapSize -= existing.getEntry(j).length;
+ newKeyCount--;
+ long length = existing.getEndInSummary(j) - existing.getPositionInSummary(j);
+ newEntriesLength -= length;
}
}
- int newKeyCount = existing.size() - removedKeyCount;
-
- // Subtract (removedKeyCount * 4) from the new size to account for fewer entries in the first section, which
- // stores the position of the actual entries in the summary.
- RefCountedMemory memory = new RefCountedMemory(newOffHeapSize - (removedKeyCount * 4));
+ Memory oldEntries = existing.getEntries();
+ Memory newOffsets = Memory.allocate(newKeyCount * 4);
+ Memory newEntries = Memory.allocate(newEntriesLength);
// Copy old entries to our new Memory.
- int idxPosition = 0;
- int keyPosition = newKeyCount * 4;
+ int i = 0;
+ int newEntriesOffset = 0;
outer:
for (int oldSummaryIndex = 0; oldSummaryIndex < existing.size(); oldSummaryIndex++)
{
@@ -326,15 +312,15 @@ public class IndexSummaryBuilder
}
// write the position of the actual entry in the index summary (4 bytes)
- memory.setInt(idxPosition, keyPosition);
- idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
-
- // write the entry itself
- byte[] entry = existing.getEntry(oldSummaryIndex);
- memory.setBytes(keyPosition, entry, 0, entry.length);
- keyPosition += entry.length;
+ newOffsets.setInt(i * 4, newEntriesOffset);
+ i++;
+ long start = existing.getPositionInSummary(oldSummaryIndex);
+ long length = existing.getEndInSummary(oldSummaryIndex) - start;
+ newEntries.put(newEntriesOffset, oldEntries, start, length);
+ newEntriesOffset += length;
}
- return new IndexSummary(partitioner, memory, newKeyCount, existing.getMaxNumberOfEntries(),
- minIndexInterval, newSamplingLevel);
+ assert newEntriesOffset == newEntriesLength;
+ return new IndexSummary(partitioner, newOffsets, newKeyCount, newEntries, newEntriesLength,
+ existing.getMaxNumberOfEntries(), minIndexInterval, newSamplingLevel);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 973b0c9..41e4adb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -736,40 +736,40 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
long indexSize = primaryIndex.length();
long histogramCount = sstableMetadata.estimatedRowSize.count();
long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
- ? histogramCount
- : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
+ ? histogramCount
+ : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
- if (recreateBloomFilter)
- bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
-
- IndexSummaryBuilder summaryBuilder = null;
- if (!summaryLoaded)
- summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
-
- long indexPosition;
- while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ try(IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel))
{
- ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
- RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex, descriptor.version);
- DecoratedKey decoratedKey = partitioner.decorateKey(key);
- if (first == null)
- first = decoratedKey;
- last = decoratedKey;
if (recreateBloomFilter)
- bf.add(decoratedKey.getKey());
+ bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
- // if summary was already read from disk we don't want to re-populate it using primary index
- if (!summaryLoaded)
+ long indexPosition;
+ while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
{
- summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
- ibuilder.addPotentialBoundary(indexPosition);
- dbuilder.addPotentialBoundary(indexEntry.position);
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+ RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex, descriptor.version);
+ DecoratedKey decoratedKey = partitioner.decorateKey(key);
+ if (first == null)
+ first = decoratedKey;
+ last = decoratedKey;
+
+ if (recreateBloomFilter)
+ bf.add(decoratedKey.getKey());
+
+ // if summary was already read from disk we don't want to re-populate it using primary index
+ if (!summaryLoaded)
+ {
+ summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
+ ibuilder.addPotentialBoundary(indexPosition);
+ dbuilder.addPotentialBoundary(indexEntry.position);
+ }
}
- }
- if (!summaryLoaded)
- indexSummary = summaryBuilder.build(partitioner);
+ if (!summaryLoaded)
+ indexSummary = summaryBuilder.build(partitioner);
+ }
}
finally
{
@@ -1004,16 +1004,17 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
try
{
long indexSize = primaryIndex.length();
- IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
-
- long indexPosition;
- while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ try (IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel))
{
- summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
- RowIndexEntry.Serializer.skip(primaryIndex);
- }
+ long indexPosition;
+ while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ {
+ summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+ RowIndexEntry.Serializer.skip(primaryIndex);
+ }
- return summaryBuilder.build(partitioner);
+ return summaryBuilder.build(partitioner);
+ }
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index b67685d..b35b652 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -403,7 +403,7 @@ public class SSTableWriter extends SSTable
SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
components, metadata,
partitioner, ifile,
- dfile, iwriter.summary.build(partitioner, boundary.lastKey),
+ dfile, iwriter.summary.build(partitioner, boundary),
iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
// now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
@@ -470,6 +470,7 @@ public class SSTableWriter extends SSTable
if (finishType.isFinal)
{
iwriter.bf.close();
+ iwriter.summary.close();
// try to save the summaries to disk
sstable.saveSummary(iwriter.builder, dbuilder);
iwriter = null;
@@ -627,6 +628,7 @@ public class SSTableWriter extends SSTable
public void abort()
{
+ summary.close();
indexFile.abort();
bf.close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
index 3e38293..8f4bed8 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
@@ -321,9 +321,9 @@ public abstract class AbstractDataOutput extends OutputStream implements DataOut
}
}
- public void write(Memory memory) throws IOException
+ public void write(Memory memory, long offset, long length) throws IOException
{
- for (ByteBuffer buffer : memory.asByteBuffers())
+ for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
write(buffer);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index 36c25ee..c2901e1 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -27,6 +27,5 @@ public interface DataOutputPlus extends DataOutput
// write the buffer without modifying its position
void write(ByteBuffer buffer) throws IOException;
- void write(Memory memory) throws IOException;
-
+ void write(Memory memory, long offset, long length) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index ea78840..dcb9de6 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -25,6 +25,7 @@ import com.sun.jna.Native;
import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.memory.MemoryUtil;
import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;
@@ -77,6 +78,9 @@ public class Memory implements AutoCloseable
if (bytes < 0)
throw new IllegalArgumentException();
+ if (Ref.DEBUG_ENABLED)
+ return new SafeMemory(bytes);
+
return new Memory(bytes);
}
@@ -163,6 +167,33 @@ public class Memory implements AutoCloseable
}
}
+ public void setShort(long offset, short l)
+ {
+ checkBounds(offset, offset + 4);
+ if (unaligned)
+ {
+ unsafe.putShort(peer + offset, l);
+ }
+ else
+ {
+ putShortByByte(peer + offset, l);
+ }
+ }
+
+ private void putShortByByte(long address, short value)
+ {
+ if (bigEndian)
+ {
+ unsafe.putByte(address, (byte) (value >> 8));
+ unsafe.putByte(address + 1, (byte) (value));
+ }
+ else
+ {
+ unsafe.putByte(address + 1, (byte) (value >> 8));
+ unsafe.putByte(address, (byte) (value));
+ }
+ }
+
public void setBytes(long memoryOffset, ByteBuffer buffer)
{
if (buffer == null)
@@ -340,20 +371,20 @@ public class Memory implements AutoCloseable
return false;
}
- public ByteBuffer[] asByteBuffers()
+ public ByteBuffer[] asByteBuffers(long offset, long length)
{
if (size() == 0)
return new ByteBuffer[0];
- ByteBuffer[] result = new ByteBuffer[(int) (size() / Integer.MAX_VALUE) + 1];
- long offset = 0;
+ ByteBuffer[] result = new ByteBuffer[(int) (length / Integer.MAX_VALUE) + 1];
int size = (int) (size() / result.length);
for (int i = 0 ; i < result.length - 1 ; i++)
{
result[i] = MemoryUtil.getByteBuffer(peer + offset, size);
offset += size;
+ length -= size;
}
- result[result.length - 1] = MemoryUtil.getByteBuffer(peer + offset, (int) (size() - offset));
+ result[result.length - 1] = MemoryUtil.getByteBuffer(peer + offset, (int) length);
return result;
}
@@ -366,5 +397,4 @@ public class Memory implements AutoCloseable
{
return String.format("Memory@[%x..%x)", peer, peer + size);
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
new file mode 100644
index 0000000..1998cc6
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class SafeMemoryWriter extends AbstractDataOutput implements DataOutputPlus
+{
+ private ByteOrder order = ByteOrder.BIG_ENDIAN;
+ private SafeMemory buffer;
+ private long length;
+
+ public SafeMemoryWriter(long initialCapacity)
+ {
+ buffer = new SafeMemory(initialCapacity);
+ }
+
+ public void write(byte[] buffer, int offset, int count)
+ {
+ long newLength = ensureCapacity(count);
+ this.buffer.setBytes(this.length, buffer, offset, count);
+ this.length = newLength;
+ }
+
+ public void write(int oneByte)
+ {
+ long newLength = ensureCapacity(1);
+ buffer.setByte(length++, (byte) oneByte);
+ length = newLength;
+ }
+
+ public void writeShort(int val) throws IOException
+ {
+ if (order != ByteOrder.nativeOrder())
+ val = Short.reverseBytes((short) val);
+ long newLength = ensureCapacity(2);
+ buffer.setShort(length, (short) val);
+ length = newLength;
+ }
+
+ public void writeInt(int val)
+ {
+ if (order != ByteOrder.nativeOrder())
+ val = Integer.reverseBytes(val);
+ long newLength = ensureCapacity(4);
+ buffer.setInt(length, val);
+ length = newLength;
+ }
+
+ public void writeLong(long val)
+ {
+ if (order != ByteOrder.nativeOrder())
+ val = Long.reverseBytes(val);
+ long newLength = ensureCapacity(8);
+ buffer.setLong(length, val);
+ length = newLength;
+ }
+
+ public void write(ByteBuffer buffer)
+ {
+ long newLength = ensureCapacity(buffer.remaining());
+ this.buffer.setBytes(length, buffer);
+ length = newLength;
+ }
+
+ public void write(Memory memory)
+ {
+ long newLength = ensureCapacity(memory.size());
+ buffer.put(length, memory, 0, memory.size());
+ length = newLength;
+ }
+
+ private long ensureCapacity(long size)
+ {
+ long newLength = this.length + size;
+ if (newLength > buffer.size())
+ setCapacity(Math.max(newLength, buffer.size() + (buffer.size() / 2)));
+ return newLength;
+ }
+
+ public SafeMemory currentBuffer()
+ {
+ return buffer;
+ }
+
+ public void setCapacity(long newCapacity)
+ {
+ if (newCapacity != capacity())
+ {
+ SafeMemory oldBuffer = buffer;
+ buffer = this.buffer.copy(newCapacity);
+ oldBuffer.free();
+ }
+ }
+
+ public void close()
+ {
+ buffer.close();
+ }
+
+ public long length()
+ {
+ return length;
+ }
+
+ public long capacity()
+ {
+ return buffer.size();
+ }
+
+ // TODO: consider hoisting this into DataOutputPlus, since most implementations can copy with this gracefully
+ // this would simplify IndexSummary.IndexSummarySerializer.serialize()
+ public SafeMemoryWriter withByteOrder(ByteOrder order)
+ {
+ this.order = order;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 8213c46..4e6cef7 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -50,7 +50,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
public final class Ref<T> implements RefCounted<T>, AutoCloseable
{
static final Logger logger = LoggerFactory.getLogger(Ref.class);
- static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
+ public static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
final State state;
final T referent;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
index c656f28..96e226c 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
@@ -18,26 +18,34 @@
*/
package org.apache.cassandra.utils.concurrent;
+import java.util.Arrays;
+
/**
* An implementation of SharedCloseable that wraps a normal AutoCloseable,
* ensuring its close method is only called when all instances of SharedCloseable have been
*/
public abstract class WrappedSharedCloseable extends SharedCloseableImpl
{
- final AutoCloseable wrapped;
+ final AutoCloseable[] wrapped;
public WrappedSharedCloseable(final AutoCloseable closeable)
{
+ this(new AutoCloseable[] { closeable});
+ }
+
+ public WrappedSharedCloseable(final AutoCloseable[] closeable)
+ {
super(new RefCounted.Tidy()
{
public void tidy() throws Exception
{
- closeable.close();
+ for (AutoCloseable c : closeable)
+ c.close();
}
public String name()
{
- return closeable.toString();
+ return Arrays.toString(closeable);
}
});
wrapped = closeable;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 9aca66d..9c709a3 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -91,38 +91,42 @@ public class IndexSummaryTest
public void testAddEmptyKey() throws Exception
{
IPartitioner p = new RandomPartitioner();
- IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL);
- builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0);
- IndexSummary summary = builder.build(p);
- assertEquals(1, summary.size());
- assertEquals(0, summary.getPosition(0));
- assertArrayEquals(new byte[0], summary.getKey(0));
-
- DataOutputBuffer dos = new DataOutputBuffer();
- IndexSummary.serializer.serialize(summary, dos, false);
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
- IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1, 1);
-
- assertEquals(1, loaded.size());
- assertEquals(summary.getPosition(0), loaded.getPosition(0));
- assertArrayEquals(summary.getKey(0), summary.getKey(0));
+ try (IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL))
+ {
+ builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0);
+ IndexSummary summary = builder.build(p);
+ assertEquals(1, summary.size());
+ assertEquals(0, summary.getPosition(0));
+ assertArrayEquals(new byte[0], summary.getKey(0));
+
+ DataOutputBuffer dos = new DataOutputBuffer();
+ IndexSummary.serializer.serialize(summary, dos, false);
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
+ IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1, 1);
+
+ assertEquals(1, loaded.size());
+ assertEquals(summary.getPosition(0), loaded.getPosition(0));
+ assertArrayEquals(summary.getKey(0), summary.getKey(0));
+ }
}
private Pair<List<DecoratedKey>, IndexSummary> generateRandomIndex(int size, int interval)
{
List<DecoratedKey> list = Lists.newArrayList();
- IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval, BASE_SAMPLING_LEVEL);
- for (int i = 0; i < size; i++)
+ try (IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval, BASE_SAMPLING_LEVEL))
{
- UUID uuid = UUID.randomUUID();
- DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(uuid));
- list.add(key);
+ for (int i = 0; i < size; i++)
+ {
+ UUID uuid = UUID.randomUUID();
+ DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(uuid));
+ list.add(key);
+ }
+ Collections.sort(list);
+ for (int i = 0; i < size; i++)
+ builder.maybeAddEntry(list.get(i), i);
+ IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
+ return Pair.create(list, summary);
}
- Collections.sort(list);
- for (int i = 0; i < size; i++)
- builder.maybeAddEntry(list.get(i), i);
- IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
- return Pair.create(list, summary);
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 76f3304..7110d1d 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -92,6 +92,17 @@ public class DataOutputTest
}
@Test
+ public void testSafeMemoryWriter() throws IOException
+ {
+ SafeMemoryWriter write = new SafeMemoryWriter(10);
+ DataInput canon = testWrite(write);
+ byte[] bytes = new byte[345];
+ write.currentBuffer().getBytes(0, bytes, 0, 345);
+ DataInput test = new DataInputStream(new ByteArrayInputStream(bytes));
+ testRead(test, canon);
+ }
+
+ @Test
public void testFileOutputStream() throws IOException
{
File file = FileUtils.createTempFile("dataoutput", "test");