You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/15 15:29:08 UTC
svn commit: r954875 - in
/cassandra/trunk/src/java/org/apache/cassandra/io/sstable:
RowIndexedReader.java RowIndexedScanner.java SSTableReader.java
SSTableScanner.java SSTableWriter.java
Author: jbellis
Date: Tue Jun 15 13:29:07 2010
New Revision: 954875
URL: http://svn.apache.org/viewvc?rev=954875&view=rev
Log:
merge {SSTable,RowIndexed}Reader, {SSTable,RowIndexed}Scanner
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1127
Removed:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=954875&r1=954874&r2=954875&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Tue Jun 15 13:29:07 2010
@@ -26,6 +26,13 @@ import java.lang.ref.Reference;
import java.nio.channels.FileChannel;
import java.nio.MappedByteBuffer;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,10 +51,13 @@ import org.apache.cassandra.io.util.File
* SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
* Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
*/
-public abstract class SSTableReader extends SSTable implements Comparable<SSTableReader>
+public class SSTableReader extends SSTable implements Comparable<SSTableReader>
{
private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
+ // guesstimated size of INDEX_INTERVAL index entries
+ private static final int INDEX_FILE_BUFFER_BYTES = 16 * IndexSummary.INDEX_INTERVAL;
+
// `finalizers` is required to keep the PhantomReferences alive after the enclosing SSTR is itself
// unreferenced. otherwise they will never get enqueued.
private static final Set<Reference<SSTableReader>> finalizers = new HashSet<Reference<SSTableReader>>();
@@ -97,6 +107,14 @@ public abstract class SSTableReader exte
*/
public final long maxDataAge;
+ // indexfile and datafile: might be null before a call to load()
+ private SegmentedFile ifile;
+ private SegmentedFile dfile;
+
+ private InstrumentedCache<Pair<Descriptor,DecoratedKey>, Long> keyCache;
+
+ private volatile SSTableDeletingReference phantomReference;
+
public static int indexInterval()
{
return IndexSummary.INDEX_INTERVAL;
@@ -144,7 +162,7 @@ public abstract class SSTableReader exte
// FIXME: version conditional readers here
if (true)
{
- sstable = RowIndexedReader.internalOpen(descriptor, partitioner);
+ sstable = internalOpen(descriptor, partitioner);
}
if (logger.isDebugEnabled())
@@ -153,39 +171,173 @@ public abstract class SSTableReader exte
return sstable;
}
+ /** Open a RowIndexedReader which needs its state loaded from disk. */
+ static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner) throws IOException
+ {
+ SSTableReader sstable = new SSTableReader(desc, partitioner, null, null, null, null, System.currentTimeMillis());
+
+ // versions before 'c' encoded keys as utf-16 before hashing to the filter
+ if (desc.versionCompareTo("c") < 0)
+ sstable.load(true);
+ else
+ {
+ sstable.load(false);
+ sstable.loadBloomFilter();
+ }
+
+ return sstable;
+ }
+
+ /**
+ * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
+ */
+ static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge) throws IOException
+ {
+ assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null;
+ return new SSTableReader(desc, partitioner, ifile, dfile, isummary, bf, maxDataAge);
+ }
+
+ SSTableReader(Descriptor desc,
+ IPartitioner partitioner,
+ SegmentedFile ifile,
+ SegmentedFile dfile,
+ IndexSummary indexSummary,
+ BloomFilter bloomFilter,
+ long maxDataAge)
+ throws IOException
+ {
+ super(desc, partitioner);
+ this.maxDataAge = maxDataAge;
+
+
+ this.ifile = ifile;
+ this.dfile = dfile;
+ this.indexSummary = indexSummary;
+ this.bf = bloomFilter;
+ }
+
public void setTrackedBy(SSTableTracker tracker)
{
phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue);
finalizers.add(phantomReference);
+ keyCache = tracker.getKeyCache();
}
- protected SSTableReader(Descriptor desc, IPartitioner partitioner, long maxDataAge)
+ void loadBloomFilter() throws IOException
{
- super(desc, partitioner);
- this.maxDataAge = maxDataAge;
+ DataInputStream stream = new DataInputStream(new FileInputStream(filterFilename()));
+ try
+ {
+ bf = BloomFilter.serializer().deserialize(stream);
+ }
+ finally
+ {
+ stream.close();
+ }
}
- private volatile SSTableDeletingReference phantomReference;
+ /**
+ * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
+ */
+ private void load(boolean recreatebloom) throws IOException
+ {
+ SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder();
+ SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder();
+
+ // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+ indexSummary = new IndexSummary();
+ BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
+ try
+ {
+ long indexSize = input.length();
+ if (recreatebloom)
+ // estimate key count based on index length
+ bf = BloomFilter.getFilter((int)(input.length() / 32), 15);
+ while (true)
+ {
+ long indexPosition = input.getFilePointer();
+ if (indexPosition == indexSize)
+ break;
+
+ DecoratedKey decoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
+ if (recreatebloom)
+ bf.add(decoratedKey.key);
+ long dataPosition = input.readLong();
+
+ indexSummary.maybeAddEntry(decoratedKey, indexPosition);
+ ibuilder.addPotentialBoundary(indexPosition);
+ dbuilder.addPotentialBoundary(dataPosition);
+ }
+ indexSummary.complete();
+ }
+ finally
+ {
+ input.close();
+ }
+
+ // finalize the state of the reader
+ indexSummary.complete();
+ ifile = ibuilder.complete(indexFilename());
+ dfile = dbuilder.complete(getFilename());
+ }
+
+ /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
+ private IndexSummary.KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
+ {
+ assert indexSummary.getIndexPositions() != null && indexSummary.getIndexPositions().size() > 0;
+ int index = Collections.binarySearch(indexSummary.getIndexPositions(), new IndexSummary.KeyPosition(decoratedKey, -1));
+ if (index < 0)
+ {
+ // binary search gives us the first index _greater_ than the key searched for,
+ // i.e., its insertion position
+ int greaterThan = (index + 1) * -1;
+ if (greaterThan == 0)
+ return null;
+ return indexSummary.getIndexPositions().get(greaterThan - 1);
+ }
+ else
+ {
+ return indexSummary.getIndexPositions().get(index);
+ }
+ }
/**
* For testing purposes only.
*/
- public abstract void forceFilterFailures();
+ public void forceFilterFailures()
+ {
+ bf = BloomFilter.alwaysMatchingBloomFilter();
+ }
/**
* @return The key cache: for monitoring purposes.
*/
- public abstract InstrumentedCache getKeyCache();
+ public InstrumentedCache getKeyCache()
+ {
+ return keyCache;
+ }
/**
* @return An estimate of the number of keys in this SSTable.
*/
- public abstract long estimatedKeys();
+ public long estimatedKeys()
+ {
+ return indexSummary.getIndexPositions().size() * IndexSummary.INDEX_INTERVAL;
+ }
/**
* @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable.
*/
- public abstract Collection<DecoratedKey> getKeySamples();
+ public Collection<DecoratedKey> getKeySamples()
+ {
+ return Collections2.transform(indexSummary.getIndexPositions(),
+ new Function<IndexSummary.KeyPosition, DecoratedKey>(){
+ public DecoratedKey apply(IndexSummary.KeyPosition kp)
+ {
+ return kp.key;
+ }
+ });
+ }
/**
* Returns the position in the data file to find the given key, or -1 if the
@@ -193,7 +345,71 @@ public abstract class SSTableReader exte
* FIXME: should not be public: use Scanner.
*/
@Deprecated
- public abstract long getPosition(DecoratedKey decoratedKey) throws IOException;
+ public long getPosition(DecoratedKey decoratedKey)
+ {
+ // first, check bloom filter
+ if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
+ return -1;
+
+ // next, the key cache
+ Pair<Descriptor, DecoratedKey> unifiedKey = new Pair<Descriptor, DecoratedKey>(desc, decoratedKey);
+ if (keyCache != null && keyCache.getCapacity() > 0)
+ {
+ Long cachedPosition = keyCache.get(unifiedKey);
+ if (cachedPosition != null)
+ {
+ return cachedPosition;
+ }
+ }
+
+ // next, see if the sampled index says it's impossible for the key to be present
+ IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+ if (sampledPosition == null)
+ return -1;
+
+ // scan the on-disk index, starting at the nearest sampled position
+ int i = 0;
+ Iterator<FileDataInput> segments = ifile.iterator(sampledPosition.indexPosition, INDEX_FILE_BUFFER_BYTES);
+ while (segments.hasNext())
+ {
+ FileDataInput input = segments.next();
+ try
+ {
+ while (!input.isEOF() && i++ < IndexSummary.INDEX_INTERVAL)
+ {
+ // read key & data position from index entry
+ DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
+ long dataPosition = input.readLong();
+
+ int v = indexDecoratedKey.compareTo(decoratedKey);
+ if (v == 0)
+ {
+ if (keyCache != null && keyCache.getCapacity() > 0)
+ keyCache.put(unifiedKey, Long.valueOf(dataPosition));
+ return dataPosition;
+ }
+ if (v > 0)
+ return -1;
+ }
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ finally
+ {
+ try
+ {
+ input.close();
+ }
+ catch (IOException e)
+ {
+ logger.error("error closing file", e);
+ }
+ }
+ }
+ return -1;
+ }
/**
* Like getPosition, but if key is not found will return the location of the
@@ -201,12 +417,54 @@ public abstract class SSTableReader exte
* FIXME: should not be public: use Scanner.
*/
@Deprecated
- public abstract long getNearestPosition(DecoratedKey decoratedKey) throws IOException;
+ public long getNearestPosition(DecoratedKey decoratedKey)
+ {
+ IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+ if (sampledPosition == null)
+ return 0;
+
+ // scan the on-disk index, starting at the nearest sampled position
+ Iterator<FileDataInput> segiter = ifile.iterator(sampledPosition.indexPosition, INDEX_FILE_BUFFER_BYTES);
+ while (segiter.hasNext())
+ {
+ FileDataInput input = segiter.next();
+ try
+ {
+ while (!input.isEOF())
+ {
+ DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
+ long position = input.readLong();
+ int v = indexDecoratedKey.compareTo(decoratedKey);
+ if (v >= 0)
+ return position;
+ }
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ finally
+ {
+ try
+ {
+ input.close();
+ }
+ catch (IOException e)
+ {
+ logger.error("error closing file", e);
+ }
+ }
+ }
+ return -1;
+ }
/**
* @return The length in bytes of the data file for this SSTable.
*/
- public abstract long length();
+ public long length()
+ {
+ return dfile.length;
+ }
public void markCompacted()
{
@@ -228,16 +486,35 @@ public abstract class SSTableReader exte
* @param bufferSize Buffer size in bytes for this Scanner.
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public abstract SSTableScanner getScanner(int bufferSize);
+ public SSTableScanner getScanner(int bufferSize)
+ {
+ return new SSTableScanner(this, bufferSize);
+ }
/**
* @param bufferSize Buffer size in bytes for this Scanner.
* @param filter filter to use when reading the columns
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public abstract SSTableScanner getScanner(int bufferSize, QueryFilter filter);
-
- public abstract FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize);
+ public SSTableScanner getScanner(int bufferSize, QueryFilter filter)
+ {
+ return new SSTableScanner(this, filter, bufferSize);
+ }
+
+ public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize)
+ {
+ long position = getPosition(decoratedKey);
+ if (position < 0)
+ return null;
+
+ return dfile.getSegment(position, bufferSize);
+ }
+
+
+ public int compareTo(SSTableReader o)
+ {
+ return desc.generation - o.desc.generation;
+ }
public AbstractType getColumnComparator()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=954875&r1=954874&r2=954875&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Tue Jun 15 13:29:07 2010
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,17 +20,177 @@
package org.apache.cassandra.io.sstable;
import java.io.Closeable;
+import java.io.IOException;
+import java.io.IOError;
import java.util.Iterator;
+import java.util.Arrays;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.filter.IColumnIterator;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public abstract class SSTableScanner implements Iterator<IColumnIterator>, Closeable
-{
- public abstract void seekTo(DecoratedKey seekKey);
- public abstract long getFileLength();
+public class SSTableScanner implements Iterator<IColumnIterator>, Closeable
+{
+ private static Logger logger = LoggerFactory.getLogger(SSTableScanner.class);
- public abstract long getFilePointer();
+ private final BufferedRandomAccessFile file;
+ private final SSTableReader sstable;
+ private IColumnIterator row;
+ private boolean exhausted = false;
+ private Iterator<IColumnIterator> iterator;
+ private QueryFilter filter;
+
+ /**
+ * @param sstable SSTable to scan.
+ */
+ SSTableScanner(SSTableReader sstable, int bufferSize)
+ {
+ try
+ {
+ this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", bufferSize);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ this.sstable = sstable;
+ }
+
+ /**
+ * @param sstable SSTable to scan.
+ * @param filter filter to use when scanning the columns
+ */
+ SSTableScanner(SSTableReader sstable, QueryFilter filter, int bufferSize)
+ {
+ try
+ {
+ this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", bufferSize);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ this.sstable = sstable;
+ this.filter = filter;
+ }
+
+ public void close() throws IOException
+ {
+ file.close();
+ }
+
+ public void seekTo(DecoratedKey seekKey)
+ {
+ try
+ {
+ long position = sstable.getNearestPosition(seekKey);
+ if (position < 0)
+ {
+ exhausted = true;
+ return;
+ }
+ file.seek(position);
+ row = null;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("corrupt sstable", e);
+ }
+ }
+
+ public long getFileLength()
+ {
+ try
+ {
+ return file.length();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public long getFilePointer()
+ {
+ return file.getFilePointer();
+ }
+
+ public boolean hasNext()
+ {
+ if (iterator == null)
+ iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : new KeyScanningIterator();
+ return iterator.hasNext();
+ }
+
+ public IColumnIterator next()
+ {
+ if (iterator == null)
+ iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : new KeyScanningIterator();
+ return iterator.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private class KeyScanningIterator implements Iterator<IColumnIterator>
+ {
+ private long dataStart;
+ private long finishedAt;
+
+ public boolean hasNext()
+ {
+ try
+ {
+ if (row == null)
+ return !file.isEOF();
+ return finishedAt < file.length();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public IColumnIterator next()
+ {
+ try
+ {
+ if (row != null)
+ file.seek(finishedAt);
+ assert !file.isEOF();
+
+ DecoratedKey key = StorageService.getPartitioner().convertFromDiskFormat(FBUtilities.readShortByteArray(file));
+ int dataSize = file.readInt();
+ dataStart = file.getFilePointer();
+ finishedAt = dataStart + dataSize;
+
+ if (filter == null)
+ {
+ return row = new SSTableIdentityIterator(sstable, file, key, dataStart, finishedAt);
+ }
+ else
+ {
+ return row = filter.getSSTableColumnIterator(sstable, file, key, dataStart);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=954875&r1=954874&r2=954875&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jun 15 13:29:07 2010
@@ -163,7 +163,7 @@ public class SSTableWriter extends SSTab
SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
ibuilder = null;
dbuilder = null;
- return RowIndexedReader.internalOpen(newdesc, partitioner, ifile, dfile, indexSummary, bf, maxDataAge);
+ return SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, indexSummary, bf, maxDataAge);
}
static Descriptor rename(Descriptor tmpdesc)