You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/02 00:31:08 UTC
svn commit: r799948 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/ColumnFamily.java db/filter/SSTableNamesIterator.java
io/IFileReader.java io/IndexHelper.java io/SequenceFile.java
Author: jbellis
Date: Sat Aug 1 22:31:08 2009
New Revision: 799948
URL: http://svn.apache.org/viewvc?rev=799948&view=rev
Log:
replace SF.next with code in NamesIterator that doesn't rewrite data through an extra layer of DataOutput/Input
patch by jbellis; reviewed by Stu Hood for CASSANDRA-330
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=799948&r1=799947&r2=799948&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Sat Aug 1 22:31:08 2009
@@ -411,6 +411,27 @@
return type_;
}
+ private String getComparatorName()
+ {
+ return getComparator().getClass().getCanonicalName();
+ }
+
+ private String getSubComparatorName()
+ {
+ AbstractType subcolumnComparator = getSubComparator();
+ return subcolumnComparator == null ? "" : subcolumnComparator.getClass().getCanonicalName();
+ }
+
+ public int serializedSize()
+ {
+ int subtotal = 4 * IColumn.UtfPrefix_ + name_.length() + type_.length() + getComparatorName().length() + getSubComparatorName().length() + 4 + 8 + 4;
+ for (IColumn column : columns_.values())
+ {
+ subtotal += column.serializedSize();
+ }
+ return subtotal;
+ }
+
/** merge all columnFamilies into a single instance, with only the newest versions of columns preserved. */
static ColumnFamily resolve(List<ColumnFamily> columnFamilies)
{
@@ -466,9 +487,8 @@
dos.writeUTF(columnFamily.name());
dos.writeUTF(columnFamily.type_);
- dos.writeUTF(columnFamily.getComparator().getClass().getCanonicalName());
- AbstractType subcolumnComparator = columnFamily.getSubComparator();
- dos.writeUTF(subcolumnComparator == null ? "" : subcolumnComparator.getClass().getCanonicalName());
+ dos.writeUTF(columnFamily.getComparatorName());
+ dos.writeUTF(columnFamily.getSubComparatorName());
dos.writeInt(columnFamily.localDeletionTime);
dos.writeLong(columnFamily.markedForDeleteAt);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=799948&r1=799947&r2=799948&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java Sat Aug 1 22:31:08 2009
@@ -3,6 +3,8 @@
import java.io.IOException;
import java.util.SortedSet;
import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
@@ -14,45 +16,71 @@
private Iterator<IColumn> iter;
public final SortedSet<byte[]> columns;
- // TODO make this actually iterate so we don't have to read + deserialize + filter data that we don't need, only to skip it later in computeNext
public SSTableNamesIterator(String filename, String key, String cfName, SortedSet<byte[]> columns) throws IOException
{
+ assert columns != null;
this.columns = columns;
SSTableReader ssTable = SSTableReader.open(filename);
- IFileReader dataReader = null;
- DataOutputBuffer bufOut = new DataOutputBuffer();
- DataInputBuffer bufIn = new DataInputBuffer();
+ String decoratedKey = ssTable.getPartitioner().decorateKey(key);
+ long position = ssTable.getPosition(decoratedKey);
+ if (position < 0)
+ return;
+ BufferedRandomAccessFile file = new BufferedRandomAccessFile(filename, "r");
try
{
- dataReader = SequenceFile.bufferedReader(ssTable.getFilename(), 64 * 1024);
- String decoratedKey = ssTable.getPartitioner().decorateKey(key);
- long position = ssTable.getPosition(decoratedKey);
- if (position >= 0)
+ file.seek(position);
+
+ /* note the position where the key starts */
+ String keyInDisk = file.readUTF();
+ assert keyInDisk.equals(decoratedKey) : keyInDisk;
+ int dataSize = file.readInt();
+
+ /* Read the bloom filter summarizing the columns */
+ long preBfPos = file.getFilePointer();
+ IndexHelper.defreezeBloomFilter(file);
+ long postBfPos = file.getFilePointer();
+ dataSize -= (postBfPos - preBfPos);
+
+ List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ dataSize -= IndexHelper.readColumnIndexes(file, ssTable.getTableName(), cfName, columnIndexList);
+
+ cf = ColumnFamily.serializer().deserializeEmpty(file);
+ int totalColumns = file.readInt();
+ dataSize -= cf.serializedSize();
+
+ /* get the various column ranges we have to read */
+ List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(columns, columnIndexList, dataSize, totalColumns);
+
+ int prevPosition = 0;
+ /* now read all the columns from the ranges */
+ for (IndexHelper.ColumnRange columnRange : columnRanges)
{
- long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columns, position);
- assert bytesRead > 0;
- assert bufOut.getLength() > 0;
- bufIn.reset(bufOut.getData(), bufOut.getLength());
- /* read the key even though we do not use it */
- bufIn.readUTF();
- bufIn.readInt();
+ /* seek to the correct offset to the data */
+ long columnBegin = file.getFilePointer();
+ Coordinate coordinate = columnRange.coordinate();
+ file.skipBytes((int)(coordinate.start_ - prevPosition));
+ // read the columns in this range
+ // TODO only completely deserialize columns we are interested in
+ while (file.getFilePointer() - columnBegin < coordinate.end_ - coordinate.start_)
+ {
+ final IColumn column = cf.getColumnSerializer().deserialize(file);
+ if (columns.contains(column.name()))
+ {
+ cf.addColumn(column);
+ }
+ }
+
+ prevPosition = (int) coordinate.end_;
}
}
finally
{
- if (dataReader != null)
- {
- dataReader.close();
- }
+ file.close();
}
- if (bufIn.getLength() > 0)
- {
- cf = ColumnFamily.serializer().deserialize(bufIn);
- iter = cf.getSortedColumns().iterator();
- }
+ iter = cf.getSortedColumns().iterator();
}
public ColumnFamily getColumnFamily()
@@ -62,14 +90,8 @@
protected IColumn computeNext()
{
- if (iter == null)
+ if (iter == null || !iter.hasNext())
return endOfData();
- while (iter.hasNext())
- {
- IColumn c = iter.next();
- if (columns.contains(c.name()))
- return c;
- }
- return endOfData();
+ return iter.next();
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java?rev=799948&r1=799947&r2=799948&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java Sat Aug 1 22:31:08 2009
@@ -36,19 +36,6 @@
public boolean isEOF() throws IOException;
/**
- * This method dumps the next key/value into the DataOuputStream
- * passed in. Always use this method to query for application
- * specific data as it will have indexes.
- *
- * @param key - key we are interested in.
- * @param bufOut - DataOutputStream that needs to be filled.
- * @param columnFamilyName The name of the column family only without the ":"
- * @param columnNames - The list of columns in the cfName column family
- * that we want to return
- */
- public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> columnNames, long position) throws IOException;
-
- /**
* Close the file after reading.
* @throws IOException
*/
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=799948&r1=799947&r2=799948&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Sat Aug 1 22:31:08 2009
@@ -218,7 +218,7 @@
* @param totalNumCols the total number of columns
* @return a list of subranges which contain all the columns in columnNames
*/
- static List<ColumnRange> getMultiColumnRangesFromNameIndex(SortedSet<byte[]> columnNames, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
+ public static List<ColumnRange> getMultiColumnRangesFromNameIndex(SortedSet<byte[]> columnNames, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
{
List<ColumnRange> columnRanges = new ArrayList<ColumnRange>();
@@ -248,7 +248,7 @@
* Reads the column name indexes if present. If the
* indexes are based on time then skip over them.
*/
- static int readColumnIndexes(RandomAccessFile file, String tableName, String cfName, List<ColumnIndexInfo> columnIndexList) throws IOException
+ public static int readColumnIndexes(RandomAccessFile file, String tableName, String cfName, List<ColumnIndexInfo> columnIndexList) throws IOException
{
/* check if we have an index */
boolean hasColumnIndexes = file.readBoolean();
@@ -268,7 +268,7 @@
* @return bloom filter summarizing the column information
* @throws java.io.IOException
*/
- static BloomFilter defreezeBloomFilter(RandomAccessFile file) throws IOException
+ public static BloomFilter defreezeBloomFilter(RandomAccessFile file) throws IOException
{
int size = file.readInt();
byte[] bytes = new byte[size];
@@ -297,12 +297,12 @@
columnCount_ = columnCount;
}
- Coordinate coordinate()
+ public Coordinate coordinate()
{
return coordinate_;
}
- int count()
+ public int count()
{
return columnCount_;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=799948&r1=799947&r2=799948&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Sat Aug 1 22:31:08 2009
@@ -191,7 +191,6 @@
public static abstract class AbstractReader implements IFileReader
{
- private static final short utfPrefix_ = 2;
protected RandomAccessFile file_;
protected String filename_;
@@ -209,89 +208,6 @@
{
return filename_;
}
-
- /**
- * This method dumps the next key/value into the DataOuputStream
- * passed in. Always use this method to query for application
- * specific data as it will have indexes.
- *
- * @param key key we are interested in.
- * @param bufOut DataOutputStream that needs to be filled.
- * @param columnFamilyName name of the columnFamily
- * @param columnNames columnNames we are interested in
- */
- public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> columnNames, long position) throws IOException
- {
- assert columnNames != null;
- seek(position);
-
- /* note the position where the key starts */
- long startPosition = file_.getFilePointer();
- String keyInDisk = file_.readUTF();
- assert keyInDisk.equals(key);
- int dataSize = file_.readInt();
-
- /* write the key into buffer */
- bufOut.writeUTF(key);
-
- /* Read the bloom filter summarizing the columns */
- long preBfPos = file_.getFilePointer();
- IndexHelper.defreezeBloomFilter(file_);
- long postBfPos = file_.getFilePointer();
- dataSize -= (postBfPos - preBfPos);
-
- List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
- dataSize -= IndexHelper.readColumnIndexes(file_, getTableName(), columnFamilyName, columnIndexList);
-
- // read CF data so we can echo it back to the outstream
- String cfName = file_.readUTF();
- String cfType = file_.readUTF();
- String comparatorName = file_.readUTF();
- String subComparatorName = file_.readUTF();
- int localDeletionTime = file_.readInt();
- long markedForDeleteAt = file_.readLong();
- int totalColumns = file_.readInt();
- dataSize -= (4 * utfPrefix_ + cfName.length() + cfType.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
-
- /* get the various column ranges we have to read */
- List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(columnNames, columnIndexList, dataSize, totalColumns);
-
- /* calculate the data size */
- int numColsReturned = 0;
- int dataSizeReturned = 0;
- for (IndexHelper.ColumnRange columnRange : columnRanges)
- {
- numColsReturned += columnRange.count();
- Coordinate coordinate = columnRange.coordinate();
- dataSizeReturned += coordinate.end_ - coordinate.start_;
- }
-
- // returned data size
- bufOut.writeInt(dataSizeReturned + utfPrefix_ * 4 + cfName.length() + cfType.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
- // echo back the CF data we read
- bufOut.writeUTF(cfName);
- bufOut.writeUTF(cfType);
- bufOut.writeUTF(comparatorName);
- bufOut.writeUTF(subComparatorName);
- bufOut.writeInt(localDeletionTime);
- bufOut.writeLong(markedForDeleteAt);
- /* write number of columns */
- bufOut.writeInt(numColsReturned);
- int prevPosition = 0;
- /* now write all the columns we are required to write */
- for (IndexHelper.ColumnRange columnRange : columnRanges)
- {
- /* seek to the correct offset to the data */
- Coordinate coordinate = columnRange.coordinate();
- file_.skipBytes((int) (coordinate.start_ - prevPosition));
- bufOut.write(file_, (int) (coordinate.end_ - coordinate.start_));
- prevPosition = (int) coordinate.end_;
- }
-
- long endPosition = file_.getFilePointer();
- return endPosition - startPosition;
- }
-
}
public static class Reader extends AbstractReader