You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/02 00:31:01 UTC
svn commit: r799947 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/filter/SSTableSliceIterator.java io/IndexHelper.java io/SequenceFile.java
Author: jbellis
Date: Sat Aug 1 22:31:01 2009
New Revision: 799947
URL: http://svn.apache.org/viewvc?rev=799947&view=rev
Log:
SF shouldn't duplicate position checking that was already done by SSTable. move utility methods to IndexHelper. fix off-by-4 in dataSizeReturned.
patch by jbellis; reviewed by Stu Hood for CASSANDRA-330
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=799947&r1=799946&r2=799947&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Sat Aug 1 22:31:01 2009
@@ -36,9 +36,10 @@
/* Morph key into actual key based on the partition type. */
String decoratedKey = ssTable.getPartitioner().decorateKey(key);
- long position = ssTable.getPosition(decoratedKey);
AbstractType comparator1 = DatabaseDescriptor.getComparator(ssTable.getTableName(), cfName);
- reader = new SequenceFile.ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position);
+ long position = ssTable.getPosition(decoratedKey);
+ if (position >= 0)
+ reader = new SequenceFile.ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position);
this.comparator = comparator;
this.startColumn = startColumn;
curColumnIndex = isAscending ? 0 : -1;
@@ -89,6 +90,9 @@
protected IColumn computeNext()
{
+ if (reader == null)
+ return endOfData();
+
while (true)
{
if (isAscending)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=799947&r1=799946&r2=799947&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Sat Aug 1 22:31:01 2009
@@ -18,15 +18,13 @@
package org.apache.cassandra.io;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.util.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnSerializer;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.BloomFilter;
/**
@@ -246,6 +244,40 @@
return columnRanges;
}
+ /**
+ * Reads the column name indexes if present. If the
+ * indexes are based on time then skip over them.
+ */
+ static int readColumnIndexes(RandomAccessFile file, String tableName, String cfName, List<ColumnIndexInfo> columnIndexList) throws IOException
+ {
+ /* check if we have an index */
+ boolean hasColumnIndexes = file.readBoolean();
+ int totalBytesRead = 1;
+ /* if we do then deserialize the index */
+ if (hasColumnIndexes)
+ {
+ /* read the index */
+ totalBytesRead += deserializeIndex(tableName, cfName, file, columnIndexList);
+ }
+ return totalBytesRead;
+ }
+
+ /**
+ * Defreeze the bloom filter.
+ *
+ * @return bloom filter summarizing the column information
+ * @throws java.io.IOException
+ */
+ static BloomFilter defreezeBloomFilter(RandomAccessFile file) throws IOException
+ {
+ int size = file.readInt();
+ byte[] bytes = new byte[size];
+ file.readFully(bytes);
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes, bytes.length);
+ return BloomFilter.serializer().deserialize(bufIn);
+ }
+
/**
* A column range containing the start and end
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=799947&r1=799946&r2=799947&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Sat Aug 1 22:31:01 2009
@@ -22,7 +22,6 @@
import java.util.*;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.log4j.Logger;
@@ -106,57 +105,48 @@
private void init(byte[] startColumn, long position) throws IOException
{
- String keyInDisk = null;
- if (seekTo(position) >= 0)
- keyInDisk = file_.readUTF();
+ seek(position);
+ String keyInDisk = file_.readUTF();
+ assert keyInDisk.equals(key_);
+
+ /* read off the size of this row */
+ int dataSize = file_.readInt();
+ /* skip the bloomfilter */
+ int totalBytesRead = IndexHelper.skipBloomFilter(file_);
+ /* read off the index flag, it has to be true */
+ boolean hasColumnIndexes = file_.readBoolean();
+ totalBytesRead += 1;
+
+ /* read the index */
+ List<IndexHelper.ColumnIndexInfo> colIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ if (hasColumnIndexes)
+ totalBytesRead += IndexHelper.deserializeIndex(getTableName(), cfName_, file_, colIndexList);
- if ( keyInDisk != null && keyInDisk.equals(key_))
+ /* need to do two things here.
+ * 1. move the file pointer to the beginning of the list of stored columns
+ * 2. calculate the size of all columns */
+ String cfName = file_.readUTF();
+ cfType_ = file_.readUTF();
+ String comparatorName = file_.readUTF();
+ assert comparatorName.equals(comparator_.getClass().getCanonicalName());
+ String subComparatorName = file_.readUTF(); // subcomparator
+ localDeletionTime_ = file_.readInt();
+ markedForDeleteAt_ = file_.readLong();
+ int totalNumCols = file_.readInt();
+ allColumnsSize_ = dataSize - (totalBytesRead + 4 * utfPrefix_ + cfName.length() + cfType_.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
+
+ columnStartPosition_ = file_.getFilePointer();
+ columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols);
+
+ if (startColumn.length == 0 && !isAscending_)
{
- /* read off the size of this row */
- int dataSize = file_.readInt();
- /* skip the bloomfilter */
- int totalBytesRead = IndexHelper.skipBloomFilter(file_);
- /* read off the index flag, it has to be true */
- boolean hasColumnIndexes = file_.readBoolean();
- totalBytesRead += 1;
-
- /* read the index */
- List<IndexHelper.ColumnIndexInfo> colIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
- if (hasColumnIndexes)
- totalBytesRead += IndexHelper.deserializeIndex(getTableName(), cfName_, file_, colIndexList);
-
- /* need to do two things here.
- * 1. move the file pointer to the beginning of the list of stored columns
- * 2. calculate the size of all columns */
- String cfName = file_.readUTF();
- cfType_ = file_.readUTF();
- String comparatorName = file_.readUTF();
- assert comparatorName.equals(comparator_.getClass().getCanonicalName());
- String subComparatorName = file_.readUTF(); // subcomparator
- localDeletionTime_ = file_.readInt();
- markedForDeleteAt_ = file_.readLong();
- int totalNumCols = file_.readInt();
- allColumnsSize_ = dataSize - (totalBytesRead + 4 * utfPrefix_ + cfName.length() + cfType_.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
-
- columnStartPosition_ = file_.getFilePointer();
- columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols);
-
- if (startColumn.length == 0 && !isAscending_)
- {
- /* in this case, we assume that we want to scan from the largest column in descending order. */
- curRangeIndex_ = columnIndexList_.size() - 1;
- }
- else
- {
- int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, 0, comparator_));
- curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index;
- }
+ /* in this case, we assume that we want to scan from the largest column in descending order. */
+ curRangeIndex_ = columnIndexList_.size() - 1;
}
else
{
- /* no keys found in this file because of a false positive in BF */
- curRangeIndex_ = -1;
- columnIndexList_ = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, 0, comparator_));
+ curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index;
}
}
@@ -220,52 +210,6 @@
return filename_;
}
- long seekTo(long position) throws IOException
- {
- if (position >= 0)
- seek(position);
- return position;
- }
-
- /**
- * Defreeze the bloom filter.
- *
- * @return bloom filter summarizing the column information
- * @throws IOException
- */
- private BloomFilter defreezeBloomFilter() throws IOException
- {
- int size = file_.readInt();
- byte[] bytes = new byte[size];
- file_.readFully(bytes);
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bytes, bytes.length);
- BloomFilter bf = BloomFilter.serializer().deserialize(bufIn);
- return bf;
- }
-
- /**
- * Reads the column name indexes if present. If the
- * indexes are based on time then skip over them.
- *
- * @param cfName
- * @return
- */
- private int handleColumnNameIndexes(String cfName, List<IndexHelper.ColumnIndexInfo> columnIndexList) throws IOException
- {
- /* check if we have an index */
- boolean hasColumnIndexes = file_.readBoolean();
- int totalBytesRead = 1;
- /* if we do then deserialize the index */
- if (hasColumnIndexes)
- {
- String tableName = getTableName();
- /* read the index */
- totalBytesRead += IndexHelper.deserializeIndex(tableName, cfName, file_, columnIndexList);
- }
- return totalBytesRead;
- }
-
/**
* This method dumps the next key/value into the DataOuputStream
* passed in. Always use this method to query for application
@@ -279,26 +223,12 @@
public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> columnNames, long position) throws IOException
{
assert columnNames != null;
-
- long bytesRead = -1L;
- if (isEOF() || seekTo(position) < 0)
- return bytesRead;
+ seek(position);
/* note the position where the key starts */
long startPosition = file_.getFilePointer();
String keyInDisk = file_.readUTF();
assert keyInDisk.equals(key);
- readColumns(key, bufOut, columnFamilyName, columnNames);
-
- long endPosition = file_.getFilePointer();
- bytesRead = endPosition - startPosition;
-
- return bytesRead;
- }
-
- private void readColumns(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> cNames)
- throws IOException
- {
int dataSize = file_.readInt();
/* write the key into buffer */
@@ -306,42 +236,25 @@
/* Read the bloom filter summarizing the columns */
long preBfPos = file_.getFilePointer();
- BloomFilter bf = defreezeBloomFilter();
+ IndexHelper.defreezeBloomFilter(file_);
long postBfPos = file_.getFilePointer();
dataSize -= (postBfPos - preBfPos);
List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
- /* read the column name indexes if present */
- int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
- dataSize -= totalBytesRead;
+ dataSize -= IndexHelper.readColumnIndexes(file_, getTableName(), columnFamilyName, columnIndexList);
- /* read the column family name */
+ // read CF data so we can echo it back to the outstream
String cfName = file_.readUTF();
- dataSize -= (utfPrefix_ + cfName.length());
-
String cfType = file_.readUTF();
- dataSize -= (utfPrefix_ + cfType.length());
-
String comparatorName = file_.readUTF();
- dataSize -= (utfPrefix_ + comparatorName.length());
-
String subComparatorName = file_.readUTF();
- dataSize -= (utfPrefix_ + subComparatorName.length());
-
- /* read local deletion time */
int localDeletionTime = file_.readInt();
- dataSize -=4;
-
- /* read if this cf is marked for delete */
long markedForDeleteAt = file_.readLong();
- dataSize -= 8;
-
- /* read the total number of columns */
- int totalNumCols = file_.readInt();
- dataSize -= 4;
+ int totalColumns = file_.readInt();
+ dataSize -= (4 * utfPrefix_ + cfName.length() + cfType.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
/* get the various column ranges we have to read */
- List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(cNames, columnIndexList, dataSize, totalNumCols);
+ List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(columnNames, columnIndexList, dataSize, totalColumns);
/* calculate the data size */
int numColsReturned = 0;
@@ -354,7 +267,7 @@
}
// returned data size
- bufOut.writeInt(dataSizeReturned + utfPrefix_ * 4 + cfName.length() + cfType.length() + comparatorName.length() + subComparatorName.length() + 4 + 4 + 8 + 4);
+ bufOut.writeInt(dataSizeReturned + utfPrefix_ * 4 + cfName.length() + cfType.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
// echo back the CF data we read
bufOut.writeUTF(cfName);
bufOut.writeUTF(cfType);
@@ -374,7 +287,11 @@
bufOut.write(file_, (int) (coordinate.end_ - coordinate.start_));
prevPosition = (int) coordinate.end_;
}
+
+ long endPosition = file_.getFilePointer();
+ return endPosition - startPosition;
}
+
}
public static class Reader extends AbstractReader