You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/06 17:10:30 UTC
svn commit: r762381 -
/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
Author: jbellis
Date: Mon Apr 6 15:10:30 2009
New Revision: 762381
URL: http://svn.apache.org/viewvc?rev=762381&view=rev
Log:
merge common parts of next(timeRange) and next(columnNames). patch by jbellis; reviewed by Jun Rau. see #52
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=762381&r1=762380&r2=762381&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Mon Apr 6 15:10:30 2009
@@ -856,6 +856,11 @@
return next(key, bufOut, columnFamilyName, columnNames, section);
}
+ public long next(String key, DataOutputBuffer bufOut, String cf, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
+ {
+ return next(key, bufOut, cf, null, timeRange, section);
+ }
+
/**
* This method dumps the next key/value into the DataOuputStream
* passed in. Always use this method to query for application
@@ -863,15 +868,18 @@
*
* @param key key we are interested in.
* @param bufOut DataOutputStream that needs to be filled.
- * @param cf name of the column in our format.
- * @param timeRange time range we are interested in.
+ * @param columnFamilyName name of the columnFamily
+ * @param columnNames columnNames we are interested in
+ * OR
+ * @param timeRange time range we are interested in
* @param section region of the file that needs to be read
* @return number of bytes that were read.
* @throws IOException
*/
- public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
+ public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
{
assert !columnFamilyName.contains(":");
+ assert timeRange == null || columnNames == null; // at most one may be non-null
long bytesRead = -1L;
if (isEOF())
@@ -895,64 +903,18 @@
* is passed in. If not then we skip over this key and
* position ourselves to read the next one.
*/
- int dataSize = file_.readInt();
if (keyInDisk.equals(key))
{
- /* write the key into buffer */
- bufOut.writeUTF(keyInDisk);
-
- int bytesSkipped = IndexHelper.skipBloomFilter(file_);
- /*
- * read the correct number of bytes for the column family and
- * write data into buffer. Substract from dataSize the bloom
- * filter size.
- */
- dataSize -= bytesSkipped;
- List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
- /* Read the times indexes if present */
- int totalBytesRead = handleColumnTimeIndexes(columnFamilyName, columnIndexList);
- dataSize -= totalBytesRead;
-
- /* read the column family name */
- String cfName = file_.readUTF();
- dataSize -= (utfPrefix_ + cfName.length());
-
- /* read if this cf is marked for delete */
- long markedForDeleteAt = file_.readLong();
- dataSize -= 8;
-
- /* read the total number of columns */
- int totalNumCols = file_.readInt();
- dataSize -= 4;
-
- /* get the column range we have to read */
- IndexHelper.ColumnRange columnRange = IndexHelper.getColumnRangeFromTimeIndex(timeRange, columnIndexList, dataSize, totalNumCols);
-
- Coordinate coordinate = columnRange.coordinate();
- /* seek to the correct offset to the data, and calculate the data size */
- file_.skipBytes((int) coordinate.start_);
- dataSize = (int) (coordinate.end_ - coordinate.start_);
-
- /*
- * write the number of columns in the column family we are returning:
- * dataSize that we are reading +
- * length of column family name +
- * one booleanfor deleted or not +
- * one int for number of columns
- */
- bufOut.writeInt(dataSize + utfPrefix_ + cfName.length() + 4 + 1);
- /* write the column family name */
- bufOut.writeUTF(cfName);
- /* write if this cf is marked for delete */
- bufOut.writeLong(markedForDeleteAt);
- /* write number of columns */
- bufOut.writeInt(columnRange.count());
- /* now write the columns */
- bufOut.write(file_, dataSize);
+ if (timeRange == null) {
+ readColumns(key, bufOut, columnFamilyName, columnNames);
+ } else {
+ readTimeRange(key, bufOut, columnFamilyName, timeRange);
+ }
}
else
{
/* skip over data portion */
+ int dataSize = file_.readInt();
file_.seek(dataSize + file_.getFilePointer());
}
@@ -963,153 +925,158 @@
return bytesRead;
}
- /**
- * This method dumps the next key/value into the DataOuputStream
- * passed in. Always use this method to query for application
- * specific data as it will have indexes.
- *
- * @param key key we are interested in.
- * @param bufOut DataOutputStream that needs to be filled.
- * @param cf The name of the column family only without the ":"
- * @param columnNames The list of columns in the cfName column family that we want to return
- * @param section region of the file that needs to be read
- * @return total number of bytes read/considered
- */
+ private void readTimeRange(String key, DataOutputBuffer bufOut, String columnFamilyName, IndexHelper.TimeRange timeRange)
+ throws IOException
+ {
+ int dataSize = file_.readInt();
+
+ /* write the key into buffer */
+ bufOut.writeUTF(key);
+
+ int bytesSkipped = IndexHelper.skipBloomFilter(file_);
+ /*
+ * read the correct number of bytes for the column family and
+ * write data into buffer. Substract from dataSize the bloom
+ * filter size.
+ */
+ dataSize -= bytesSkipped;
+ List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ /* Read the times indexes if present */
+ int totalBytesRead = handleColumnTimeIndexes(columnFamilyName, columnIndexList);
+ dataSize -= totalBytesRead;
+
+ /* read the column family name */
+ String cfName = file_.readUTF();
+ dataSize -= (utfPrefix_ + cfName.length());
+
+ /* read if this cf is marked for delete */
+ long markedForDeleteAt = file_.readLong();
+ dataSize -= 8;
+
+ /* read the total number of columns */
+ int totalNumCols = file_.readInt();
+ dataSize -= 4;
+
+ /* get the column range we have to read */
+ IndexHelper.ColumnRange columnRange = IndexHelper.getColumnRangeFromTimeIndex(timeRange, columnIndexList, dataSize, totalNumCols);
+
+ Coordinate coordinate = columnRange.coordinate();
+ /* seek to the correct offset to the data, and calculate the data size */
+ file_.skipBytes((int) coordinate.start_);
+ dataSize = (int) (coordinate.end_ - coordinate.start_);
+
+ /*
+ * write the number of columns in the column family we are returning:
+ * dataSize that we are reading +
+ * length of column family name +
+ * one booleanfor deleted or not +
+ * one int for number of columns
+ */
+ bufOut.writeInt(dataSize + utfPrefix_ + cfName.length() + 4 + 1);
+ /* write the column family name */
+ bufOut.writeUTF(cfName);
+ /* write if this cf is marked for delete */
+ bufOut.writeLong(markedForDeleteAt);
+ /* write number of columns */
+ bufOut.writeInt(columnRange.count());
+ /* now write the columns */
+ bufOut.write(file_, dataSize);
+ }
+
public long next(String key, DataOutputBuffer bufOut, String cf, List<String> columnNames, Coordinate section) throws IOException
{
- String[] values = RowMutation.getColumnAndColumnFamily(cf);
- String columnFamilyName = values[0];
- List<String> cNames = new ArrayList<String>(columnNames);
+ return next(key, bufOut, cf, columnNames, null, section);
+ }
- long bytesRead = -1L;
- if (isEOF())
- return bytesRead;
+ private void readColumns(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> cNames)
+ throws IOException
+ {
+ int dataSize = file_.readInt();
- seekTo(key, section);
- /* note the position where the key starts */
- long startPosition = file_.getFilePointer();
- String keyInDisk = readKeyFromDisk(file_);
- if (keyInDisk != null)
+ /* write the key into buffer */
+ bufOut.writeUTF(key);
+
+ /* if we need to read the all the columns do not read the column indexes */
+ if (cNames == null || cNames.size() == 0)
{
+ int bytesSkipped = IndexHelper.skipBloomFilterAndIndex(file_);
/*
- * If key on disk is greater than requested key
- * we can bail out since we exploit the property
- * of the SSTable format.
- */
- if (keyInDisk.compareTo(key) > 0)
- return bytesRead;
+ * read the correct number of bytes for the column family and
+ * write data into buffer
+ */
+ dataSize -= bytesSkipped;
+ /* write the data size */
+ bufOut.writeInt(dataSize);
+ /* write the data into buffer, except the boolean we have read */
+ bufOut.write(file_, dataSize);
+ }
+ else
+ {
+ /* Read the bloom filter summarizing the columns */
+ long preBfPos = file_.getFilePointer();
+ BloomFilter bf = defreezeBloomFilter();
+ long postBfPos = file_.getFilePointer();
+ dataSize -= (postBfPos - preBfPos);
+
+ List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ /* read the column name indexes if present */
+ int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
+ dataSize -= totalBytesRead;
+
+ /* read the column family name */
+ String cfName = file_.readUTF();
+ dataSize -= (utfPrefix_ + cfName.length());
+
+ /* read if this cf is marked for delete */
+ long markedForDeleteAt = file_.readLong();
+ dataSize -= 8;
+
+ /* read the total number of columns */
+ int totalNumCols = file_.readInt();
+ dataSize -= 4;
+
+ // TODO: this is name sorted - but eventually this should be sorted by the same criteria as the col index
+ /* sort the required list of columns */
+ cNames = new ArrayList<String>(cNames);
+ Collections.sort(cNames);
+ /* get the various column ranges we have to read */
+ List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(cNames, columnIndexList, dataSize, totalNumCols);
+
+ /* calculate the data size */
+ int numColsReturned = 0;
+ int dataSizeReturned = 0;
+ for (IndexHelper.ColumnRange columnRange : columnRanges)
+ {
+ numColsReturned += columnRange.count();
+ Coordinate coordinate = columnRange.coordinate();
+ dataSizeReturned += coordinate.end_ - coordinate.start_;
+ }
/*
- * If we found the key then we populate the buffer that
- * is passed in. If not then we skip over this key and
- * position ourselves to read the next one.
+ * write the number of columns in the column family we are returning:
+ * dataSize that we are reading +
+ * length of column family name +
+ * one booleanfor deleted or not +
+ * one int for number of columns
*/
- int dataSize = file_.readInt();
- if (keyInDisk.equals(key))
- {
- /* write the key into buffer */
- bufOut.writeUTF(keyInDisk);
-
- /* if we need to read the all the columns do not read the column indexes */
- if (cNames == null || cNames.size() == 0)
- {
- int bytesSkipped = IndexHelper.skipBloomFilterAndIndex(file_);
- /*
- * read the correct number of bytes for the column family and
- * write data into buffer
- */
- dataSize -= bytesSkipped;
- /* write the data size */
- bufOut.writeInt(dataSize);
- /* write the data into buffer, except the boolean we have read */
- bufOut.write(file_, dataSize);
- }
- else
- {
- /* Read the bloom filter summarizing the columns */
- long preBfPos = file_.getFilePointer();
- BloomFilter bf = defreezeBloomFilter();
- long postBfPos = file_.getFilePointer();
- dataSize -= (postBfPos - preBfPos);
- /*
- // remove the columns that the bloom filter says do not exist.
- for ( String cName : columnNames )
- {
- if ( !bf.isPresent(cName) )
- cNames.remove(cName);
- }
- */
-
- List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
- /* read the column name indexes if present */
- int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
- dataSize -= totalBytesRead;
-
- /* read the column family name */
- String cfName = file_.readUTF();
- dataSize -= (utfPrefix_ + cfName.length());
-
- /* read if this cf is marked for delete */
- long markedForDeleteAt = file_.readLong();
- dataSize -= 8;
-
- /* read the total number of columns */
- int totalNumCols = file_.readInt();
- dataSize -= 4;
-
- // TODO: this is name sorted - but eventually this should be sorted by the same criteria as the col index
- /* sort the required list of columns */
- Collections.sort(cNames);
- /* get the various column ranges we have to read */
- List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(cNames, columnIndexList, dataSize, totalNumCols);
-
- /* calculate the data size */
- int numColsReturned = 0;
- int dataSizeReturned = 0;
- for (IndexHelper.ColumnRange columnRange : columnRanges)
- {
- numColsReturned += columnRange.count();
- Coordinate coordinate = columnRange.coordinate();
- dataSizeReturned += coordinate.end_ - coordinate.start_;
- }
-
- /*
- * write the number of columns in the column family we are returning:
- * dataSize that we are reading +
- * length of column family name +
- * one booleanfor deleted or not +
- * one int for number of columns
- */
- bufOut.writeInt(dataSizeReturned + utfPrefix_ + cfName.length() + 4 + 1);
- /* write the column family name */
- bufOut.writeUTF(cfName);
- /* write if this cf is marked for delete */
- bufOut.writeLong(markedForDeleteAt);
- /* write number of columns */
- bufOut.writeInt(numColsReturned);
- int prevPosition = 0;
- /* now write all the columns we are required to write */
- for (IndexHelper.ColumnRange columnRange : columnRanges)
- {
- /* seek to the correct offset to the data */
- Coordinate coordinate = columnRange.coordinate();
- file_.skipBytes((int) (coordinate.start_ - prevPosition));
- bufOut.write(file_, (int) (coordinate.end_ - coordinate.start_));
- prevPosition = (int) coordinate.end_;
- }
- }
- }
- else
+ bufOut.writeInt(dataSizeReturned + utfPrefix_ + cfName.length() + 4 + 1);
+ /* write the column family name */
+ bufOut.writeUTF(cfName);
+ /* write if this cf is marked for delete */
+ bufOut.writeLong(markedForDeleteAt);
+ /* write number of columns */
+ bufOut.writeInt(numColsReturned);
+ int prevPosition = 0;
+ /* now write all the columns we are required to write */
+ for (IndexHelper.ColumnRange columnRange : columnRanges)
{
- /* skip over data portion */
- file_.seek(dataSize + file_.getFilePointer());
+ /* seek to the correct offset to the data */
+ Coordinate coordinate = columnRange.coordinate();
+ file_.skipBytes((int) (coordinate.start_ - prevPosition));
+ bufOut.write(file_, (int) (coordinate.end_ - coordinate.start_));
+ prevPosition = (int) coordinate.end_;
}
-
- long endPosition = file_.getFilePointer();
- bytesRead = endPosition - startPosition;
}
-
- return bytesRead;
}
/**