You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/07 03:56:16 UTC
svn commit: r896741 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/
test/unit/org/apache/cassandra/io/
Author: jbellis
Date: Thu Jan 7 02:56:16 2010
New Revision: 896741
URL: http://svn.apache.org/viewvc?rev=896741&view=rev
Log:
store data information for any index entries spanning a mmap segment boundary when reading the index (with a BufferedRAF) so we don't have to deal with that at read time.
patch by jbellis; tested by Brandon Williams for CASSANDRA-669
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableAccessor.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java?rev=896741&r1=896740&r2=896741&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java Thu Jan 7 02:56:16 2010
@@ -25,6 +25,7 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -98,6 +99,21 @@
return key != null && key.isEmpty();
}
+ /** not efficient. call rarely. */
+ public int serializedSize()
+ {
+ DataOutputBuffer dos = new DataOutputBuffer();
+ try
+ {
+ serializer.serialize(this, dos);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return dos.getLength();
+ }
+
@Override
public String toString()
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=896741&r1=896740&r2=896741&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Thu Jan 7 02:56:16 2010
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Arrays;
+import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.commons.lang.StringUtils;
@@ -56,6 +57,7 @@
protected IPartitioner partitioner;
protected BloomFilter bf;
protected List<KeyPosition> indexPositions;
+ protected Map<KeyPosition, PositionSize> spannedIndexDataPositions; // map of index position, to data position, for index entries spanning mmap segments
protected String columnFamilyName;
/* Every 128th index entry is loaded into memory so we know where to start looking for the actual key w/o seeking */
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=896741&r1=896740&r2=896741&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Thu Jan 7 02:56:16 2010
@@ -90,7 +90,7 @@
};
new Thread(runnable, "SSTABLE-DELETER").start();
}};
- private final int BUFFER_SIZE = Integer.MAX_VALUE;
+ private static final int BUFFER_SIZE = Integer.MAX_VALUE;
public static int indexInterval()
{
@@ -196,7 +196,11 @@
private ConcurrentLinkedHashMap<DecoratedKey, PositionSize> keyCache;
- SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> indexPositions, BloomFilter bloomFilter, ConcurrentLinkedHashMap<DecoratedKey, PositionSize> keyCache)
+ SSTableReader(String filename,
+ IPartitioner partitioner,
+ List<KeyPosition> indexPositions, Map<KeyPosition, PositionSize> spannedIndexDataPositions,
+ BloomFilter bloomFilter,
+ ConcurrentLinkedHashMap<DecoratedKey, PositionSize> keyCache)
throws IOException
{
super(filename, partitioner);
@@ -219,6 +223,7 @@
}
this.indexPositions = indexPositions;
+ this.spannedIndexDataPositions = spannedIndexDataPositions;
this.bf = bloomFilter;
phantomReference = new FileDeletingReference(this, finalizerQueue);
finalizers.add(phantomReference);
@@ -261,7 +266,7 @@
private SSTableReader(String filename, IPartitioner partitioner) throws IOException
{
- this(filename, partitioner, null, null, null);
+ this(filename, partitioner, null, null, null, null);
}
public List<KeyPosition> getIndexPositions()
@@ -285,28 +290,54 @@
void loadIndexFile() throws IOException
{
indexPositions = new ArrayList<KeyPosition>();
-
- FileDataInput input = new MappedFileDataInput(indexBuffer, indexFilename());
- int i = 0;
- long indexSize = input.length();
- while (true)
+ // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+ // any entries that do, we force into the in-memory sample so key lookup can always bsearch within
+ // a single mmapped segment.
+ BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
+ try
{
- long indexPosition = input.getFilePointer();
- if (indexPosition == indexSize)
- {
- break;
- }
- DecoratedKey decoratedKey = partitioner.convertFromDiskFormat(input.readUTF());
- input.readLong();
- if (i++ % INDEX_INTERVAL == 0)
+ int i = 0;
+ long indexSize = input.length();
+ while (true)
{
- indexPositions.add(new KeyPosition(decoratedKey, indexPosition));
+ long indexPosition = input.getFilePointer();
+ if (indexPosition == indexSize)
+ {
+ break;
+ }
+ DecoratedKey decoratedKey = partitioner.convertFromDiskFormat(input.readUTF());
+ long dataPosition = input.readLong();
+ long nextIndexPosition = input.getFilePointer();
+ boolean spannedEntry = bufferIndex(indexPosition) != bufferIndex(nextIndexPosition);
+ if (i++ % INDEX_INTERVAL == 0 || spannedEntry)
+ {
+ KeyPosition info;
+ info = new KeyPosition(decoratedKey, indexPosition);
+ indexPositions.add(info);
+
+ if (spannedEntry)
+ {
+ if (spannedIndexDataPositions == null)
+ {
+ spannedIndexDataPositions = new HashMap<KeyPosition, PositionSize>();
+ }
+ // read the next index entry to see how big the row is corresponding to the current, mmap-segment-spanning one
+ input.readUTF();
+ long nextDataPosition = input.readLong();
+ input.seek(nextIndexPosition);
+ spannedIndexDataPositions.put(info, new PositionSize(dataPosition, nextDataPosition - dataPosition));
+ }
+ }
}
}
+ finally
+ {
+ input.close();
+ }
}
/** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
- private long getIndexScanPosition(DecoratedKey decoratedKey)
+ private KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
{
assert indexPositions != null && indexPositions.size() > 0;
int index = Collections.binarySearch(indexPositions, new KeyPosition(decoratedKey, -1));
@@ -316,12 +347,12 @@
// i.e., its insertion position
int greaterThan = (index + 1) * -1;
if (greaterThan == 0)
- return -1;
- return indexPositions.get(greaterThan - 1).position;
+ return null;
+ return indexPositions.get(greaterThan - 1);
}
else
{
- return indexPositions.get(index).position;
+ return indexPositions.get(index);
}
}
@@ -340,14 +371,20 @@
return cachedPosition;
}
}
- long start = getIndexScanPosition(decoratedKey);
- if (start < 0)
+ KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+ if (sampledPosition == null)
{
return null;
}
+ if (spannedIndexDataPositions != null)
+ {
+ PositionSize info = spannedIndexDataPositions.get(sampledPosition);
+ if (info != null)
+ return info;
+ }
FileDataInput input = new MappedFileDataInput(indexBuffer, indexFilename());
- input.seek(start);
+ input.seek(sampledPosition.position);
int i = 0;
do
{
@@ -388,11 +425,29 @@
/** like getPosition, but if key is not found will return the location of the first key _greater_ than the desired one, or -1 if no such key exists. */
public long getNearestPosition(DecoratedKey decoratedKey) throws IOException
{
- long start = getIndexScanPosition(decoratedKey);
- if (start < 0)
+ KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+ if (sampledPosition == null)
{
return 0;
}
+
+ // by default, we plan to start scanning at the nearest bsearched index entry
+ long start = sampledPosition.position;
+ if (spannedIndexDataPositions != null)
+ {
+ // check if the index entry spans a mmap segment boundary
+ PositionSize info = spannedIndexDataPositions.get(sampledPosition);
+ if (info != null)
+ {
+ // if the key matches the index entry we don't have to scan the index after all
+ if (sampledPosition.key.compareTo(decoratedKey) == 0)
+ return info.position;
+ // otherwise, start scanning at the next entry (which won't span a boundary;
+ // if it did it would have been in the index sample and we would have started with that instead)
+ start = info.position + sampledPosition.key.serializedSize() + (Long.SIZE / 8);
+ }
+ }
+
BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(path), "r");
input.seek(start);
try
@@ -483,7 +538,7 @@
return new MappedFileDataInput(buffers[bufferIndex(info.position)], path, (int) (info.position % BUFFER_SIZE));
}
- private int bufferIndex(long position)
+ static int bufferIndex(long position)
{
return (int) (position / BUFFER_SIZE);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=896741&r1=896740&r2=896741&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java Thu Jan 7 02:56:16 2010
@@ -23,6 +23,7 @@
import java.io.*;
import java.util.ArrayList;
+import java.util.HashMap;
import org.apache.log4j.Logger;
@@ -70,26 +71,38 @@
return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
}
- private void afterAppend(DecoratedKey decoratedKey, long position) throws IOException
+ private void afterAppend(DecoratedKey decoratedKey, long dataPosition, int dataSize) throws IOException
{
String diskKey = partitioner.convertToDiskFormat(decoratedKey);
bf.add(diskKey);
lastWrittenKey = decoratedKey;
long indexPosition = indexFile.getFilePointer();
indexFile.writeUTF(diskKey);
- indexFile.writeLong(position);
+ indexFile.writeLong(dataPosition);
if (logger.isTraceEnabled())
- logger.trace("wrote " + decoratedKey + " at " + position);
+ logger.trace("wrote " + decoratedKey + " at " + dataPosition);
+ if (logger.isTraceEnabled())
+ logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
- if (keysWritten++ % INDEX_INTERVAL != 0)
- return;
- if (indexPositions == null)
+ boolean spannedEntry = SSTableReader.bufferIndex(indexPosition) != SSTableReader.bufferIndex(indexFile.getFilePointer());
+ if (keysWritten++ % INDEX_INTERVAL == 0 || spannedEntry)
{
- indexPositions = new ArrayList<KeyPosition>();
+ if (indexPositions == null)
+ {
+ indexPositions = new ArrayList<KeyPosition>();
+ }
+ KeyPosition info = new KeyPosition(decoratedKey, indexPosition);
+ indexPositions.add(info);
+
+ if (spannedEntry)
+ {
+ if (spannedIndexDataPositions == null)
+ {
+ spannedIndexDataPositions = new HashMap<KeyPosition, PositionSize>();
+ }
+ spannedIndexDataPositions.put(info, new PositionSize(dataPosition, dataSize));
+ }
}
- indexPositions.add(new KeyPosition(decoratedKey, indexPosition));
- if (logger.isTraceEnabled())
- logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
}
// TODO make this take a DataOutputStream and wrap the byte[] version to combine them
@@ -101,7 +114,7 @@
assert length > 0;
dataFile.writeInt(length);
dataFile.write(buffer.getData(), 0, length);
- afterAppend(decoratedKey, currentPosition);
+ afterAppend(decoratedKey, currentPosition, length);
}
public void append(DecoratedKey decoratedKey, byte[] value) throws IOException
@@ -111,7 +124,7 @@
assert value.length > 0;
dataFile.writeInt(value.length);
dataFile.write(value);
- afterAppend(decoratedKey, currentPosition);
+ afterAppend(decoratedKey, currentPosition, value.length);
}
/**
@@ -141,7 +154,7 @@
ConcurrentLinkedHashMap<DecoratedKey, SSTableReader.PositionSize> keyCache = cacheFraction > 0
? SSTableReader.createKeyCache((int) (cacheFraction * keysWritten))
: null;
- return new SSTableReader(path, partitioner, indexPositions, bf, keyCache);
+ return new SSTableReader(path, partitioner, indexPositions, spannedIndexDataPositions, bf, keyCache);
}
static String rename(String tmpFilename)
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableAccessor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableAccessor.java?rev=896741&r1=896740&r2=896741&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableAccessor.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableAccessor.java Thu Jan 7 02:56:16 2010
@@ -27,7 +27,7 @@
public static SSTableReader getSSTableReader(String filename, IPartitioner<?> partitioner)
throws IOException
{
- SSTableReader sstable = new SSTableReader(filename, partitioner, null, null, null);
+ SSTableReader sstable = new SSTableReader(filename, partitioner, null, null, null, null);
sstable.loadBloomFilter();
sstable.loadIndexFile();
return sstable;