You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/07 18:23:53 UTC
svn commit: r802071 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/ColumnIndexer.java db/filter/SSTableSliceIterator.java io/IndexHelper.java
Author: jbellis
Date: Fri Aug 7 16:23:53 2009
New Revision: 802071
URL: http://svn.apache.org/viewvc?rev=802071&view=rev
Log:
don't serialize unused column count into column index. remove DataInput/Output round-tripping from ColumnGroupReader
patch by jbellis; reviewed by Jun Rao for CASSANDRA-332
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=802071&r1=802070&r2=802071&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Fri Aug 7 16:23:53 2009
@@ -103,7 +103,6 @@
private static void doIndexing(AbstractType comparator, Collection<IColumn> columns, DataOutputStream dos) throws IOException
{
/* we are going to write column indexes */
- int numColumns = 0;
int position = 0;
int indexSizeInBytes = 0;
int sizeSummarized = 0;
@@ -118,14 +117,13 @@
List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
/* column offsets at the right thresholds into the index map. */
- for ( IColumn column : columns )
+ for (IColumn column : columns)
{
/* if we hit the column index size that we have to index after, go ahead and index it */
- if(position - sizeSummarized >= DatabaseDescriptor.getColumnIndexSize())
- {
- IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(column.name(), 0, 0, comparator);
+ if (position - sizeSummarized >= DatabaseDescriptor.getColumnIndexSize())
+ {
+ IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(column.name(), 0, comparator);
cIndexInfo.position(position);
- cIndexInfo.count(numColumns);
columnIndexList.add(cIndexInfo);
/*
* we will be writing this object as a UTF8 string and two ints,
@@ -135,10 +133,8 @@
*/
indexSizeInBytes += cIndexInfo.size();
sizeSummarized = position;
- numColumns = 0;
}
position += column.serializedSize();
- ++numColumns;
}
/* write the column index list */
IndexHelper.serialize(indexSizeInBytes, columnIndexList, dos);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=802071&r1=802070&r2=802071&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Fri Aug 7 16:23:53 2009
@@ -1,9 +1,6 @@
package org.apache.cassandra.db.filter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.*;
import java.io.IOException;
import org.apache.commons.lang.ArrayUtils;
@@ -22,8 +19,6 @@
{
protected boolean isAscending;
private byte[] startColumn;
- private DataOutputBuffer outBuf = new DataOutputBuffer();
- private DataInputBuffer inBuf = new DataInputBuffer();
private int curColumnIndex;
private ColumnFamily curCF = null;
private ArrayList<IColumn> curColumns = new ArrayList<IColumn>();
@@ -69,15 +64,17 @@
private void getColumnsFromBuffer() throws IOException
{
- inBuf.reset(outBuf.getData(), outBuf.getLength());
- ColumnFamily columnFamily = ColumnFamily.serializer().deserialize(inBuf);
-
if (curCF == null)
- curCF = columnFamily.cloneMeShallow();
+ curCF = reader.getEmptyColumnFamily().cloneMeShallow();
curColumns.clear();
- for (IColumn column : columnFamily.getSortedColumns())
+ while (true)
+ {
+ IColumn column = reader.pollColumn();
+ if (column == null)
+ break;
if (isColumnNeeded(column))
curColumns.add(column);
+ }
if (isAscending)
curColumnIndex = 0;
@@ -114,7 +111,7 @@
try
{
- if (!reader.getNextBlock(outBuf))
+ if (!reader.getNextBlock())
return endOfData();
getColumnsFromBuffer();
}
@@ -140,25 +137,22 @@
{
private String key_;
private String cfName_;
- private String cfType_;
private AbstractType comparator_;
- private String subComparatorName_;
private boolean isAscending_;
+ private ColumnFamily emptyColumnFamily;
private List<IndexHelper.ColumnIndexInfo> columnIndexList_;
private long columnStartPosition_;
private int curRangeIndex_;
private int allColumnsSize_;
- private int localDeletionTime_;
- private long markedForDeleteAt_;
private BufferedRandomAccessFile file_;
+ private Queue<IColumn> blockColumns = new ArrayDeque<IColumn>();
public ColumnGroupReader(String filename, String key, String cfName, AbstractType comparator, byte[] startColumn, boolean isAscending, long position) throws IOException
{
this.file_ = new BufferedRandomAccessFile(filename, "r");
this.cfName_ = cfName;
this.comparator_ = comparator;
- this.subComparatorName_ = DatabaseDescriptor.getSubComparator(SSTableReader.parseTableName(filename), cfName).getClass().getCanonicalName();
this.key_ = key;
this.isAscending_ = isAscending;
init(startColumn, position);
@@ -167,34 +161,24 @@
/**
* Build a list of index entries ready for search.
*/
- private List<IndexHelper.ColumnIndexInfo> getFullColumnIndexList(List<IndexHelper.ColumnIndexInfo> columnIndexList, int totalColumns)
+ private List<IndexHelper.ColumnIndexInfo> getFullColumnIndexList(List<IndexHelper.ColumnIndexInfo> columnIndexList)
{
if (columnIndexList.size() == 0)
{
/* if there is no column index, add an index entry that covers the full space. */
- return Arrays.asList(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, totalColumns, comparator_));
+ return Arrays.asList(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, comparator_));
}
List<IndexHelper.ColumnIndexInfo> fullColIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
- int accumulatedColumns = 0;
- for (IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList)
- accumulatedColumns += colPosInfo.count();
- int remainingCols = totalColumns - accumulatedColumns;
- fullColIndexList.add(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, columnIndexList.get(0).count(), comparator_));
+ fullColIndexList.add(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, comparator_));
for (int i = 0; i < columnIndexList.size() - 1; i++)
{
IndexHelper.ColumnIndexInfo colPosInfo = columnIndexList.get(i);
- fullColIndexList.add(new IndexHelper.ColumnIndexInfo(colPosInfo.name(),
- colPosInfo.position(),
- columnIndexList.get(i + 1).count(),
- comparator_));
+ fullColIndexList.add(new IndexHelper.ColumnIndexInfo(colPosInfo.name(), colPosInfo.position(), comparator_));
}
byte[] columnName = columnIndexList.get(columnIndexList.size() - 1).name();
- fullColIndexList.add(new IndexHelper.ColumnIndexInfo(columnName,
- columnIndexList.get(columnIndexList.size() - 1).position(),
- remainingCols,
- comparator_));
+ fullColIndexList.add(new IndexHelper.ColumnIndexInfo(columnName, columnIndexList.get(columnIndexList.size() - 1).position(), comparator_));
return fullColIndexList;
}
@@ -220,18 +204,13 @@
/* need to do two things here.
* 1. move the file pointer to the beginning of the list of stored columns
* 2. calculate the size of all columns */
- String cfName = file_.readUTF();
- cfType_ = file_.readUTF();
- String comparatorName = file_.readUTF();
- assert comparatorName.equals(comparator_.getClass().getCanonicalName());
- String subComparatorName = file_.readUTF(); // subcomparator
- localDeletionTime_ = file_.readInt();
- markedForDeleteAt_ = file_.readLong();
+ emptyColumnFamily = ColumnFamily.serializer().deserializeEmpty(file_);
int totalNumCols = file_.readInt();
- allColumnsSize_ = dataSize - (totalBytesRead + 4 * 2 + cfName.length() + cfType_.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
+ totalBytesRead += emptyColumnFamily.serializedSize();
+ allColumnsSize_ = dataSize - totalBytesRead;
columnStartPosition_ = file_.getFilePointer();
- columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols);
+ columnIndexList_ = getFullColumnIndexList(colIndexList);
if (startColumn.length == 0 && !isAscending_)
{
@@ -240,12 +219,22 @@
}
else
{
- int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, 0, comparator_));
+ int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, comparator_));
curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index;
}
}
- private boolean getBlockFromCurIndex(DataOutputBuffer bufOut) throws IOException
+ public ColumnFamily getEmptyColumnFamily()
+ {
+ return emptyColumnFamily;
+ }
+
+ public IColumn pollColumn()
+ {
+ return blockColumns.poll();
+ }
+
+ private boolean getBlockFromCurIndex() throws IOException
{
if (curRangeIndex_ < 0 || curRangeIndex_ >= columnIndexList_.size())
return false;
@@ -257,25 +246,16 @@
/* seek to the correct offset to the data, and calculate the data size */
file_.seek(columnStartPosition_ + start);
- long dataSize = end - start;
-
- bufOut.reset();
- // write CF info
- bufOut.writeUTF(cfName_);
- bufOut.writeUTF(cfType_);
- bufOut.writeUTF(comparator_.getClass().getCanonicalName());
- bufOut.writeUTF(subComparatorName_);
- bufOut.writeInt(localDeletionTime_);
- bufOut.writeLong(markedForDeleteAt_);
- // now write the columns
- bufOut.writeInt(curColPostion.count());
- bufOut.write(file_, (int)dataSize);
+ while (file_.getFilePointer() < columnStartPosition_ + end)
+ {
+ blockColumns.add(emptyColumnFamily.getColumnSerializer().deserialize(file_));
+ }
return true;
}
- public boolean getNextBlock(DataOutputBuffer outBuf) throws IOException
+ public boolean getNextBlock() throws IOException
{
- boolean result = getBlockFromCurIndex(outBuf);
+ boolean result = getBlockFromCurIndex();
if (isAscending_)
curRangeIndex_++;
else
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=802071&r1=802070&r2=802071&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Fri Aug 7 16:23:53 2009
@@ -171,46 +171,21 @@
*/
static ColumnRange getColumnRangeFromNameIndex(IndexHelper.ColumnIndexInfo cIndexInfo, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
{
- /* find the offset for the column */
- int size = columnIndexList.size();
- long start = 0;
- long end = dataSize;
- int numColumns = 0;
-
- int index = Collections.binarySearch(columnIndexList, cIndexInfo);
- if ( index < 0 )
- {
- /* We are here which means that the requested column is not an index. */
- index = (++index)*(-1);
- }
- else
- {
- ++index;
- }
-
- /* calculate the starting offset from which we have to read */
- start = (index == 0) ? 0 : columnIndexList.get(index - 1).position();
+ // TODO this looks like it can be simplified
+ int rawIndex = Collections.binarySearch(columnIndexList, cIndexInfo);
+ int index = rawIndex < 0
+ ? -1 * (rawIndex + 1)
+ : rawIndex + 1;
+ if (index > 0)
+ index -= 1;
+ assert index < columnIndexList.size();
- if( index < size )
- {
- end = columnIndexList.get(index).position();
- numColumns = columnIndexList.get(index).count();
- }
- else
- {
- end = dataSize;
- int totalColsIndexed = 0;
- for( IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList )
- {
- totalColsIndexed += colPosInfo.count();
- }
- numColumns = totalNumCols - totalColsIndexed;
- }
+ long blockStart = columnIndexList.get(index).position();
- return new ColumnRange(start, end, numColumns);
+ return new ColumnRange(blockStart, blockStart);
}
- /**
+ /**
* Returns the sub-ranges that contain the list of columns in columnNames.
* @param columnNames The list of columns whose subranges need to be found
* @param columnIndexList the deserialized column indexes
@@ -224,14 +199,14 @@
if (columnIndexList.size() == 0)
{
- columnRanges.add(new ColumnRange(0, dataSize, totalNumCols));
+ columnRanges.add(new ColumnRange(0, dataSize));
}
else
{
Map<Long, Boolean> offset = new HashMap<Long, Boolean>();
for (byte[] name : columnNames)
{
- IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(name, 0, 0, (AbstractType)columnNames.comparator());
+ IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(name, 0, (AbstractType)columnNames.comparator());
ColumnRange columnRange = getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, totalNumCols);
if (offset.get(columnRange.coordinate().start_) == null)
{
@@ -289,23 +264,16 @@
public static class ColumnRange
{
private Coordinate coordinate_;
- private int columnCount_;
-
- ColumnRange(long start, long end, int columnCount)
+
+ ColumnRange(long start, long end)
{
coordinate_ = new Coordinate(start, end);
- columnCount_ = columnCount;
}
public Coordinate coordinate()
{
return coordinate_;
}
-
- public int count()
- {
- return columnCount_;
- }
}
/**
@@ -315,7 +283,6 @@
public static class ColumnIndexInfo implements Comparable<ColumnIndexInfo>
{
private long position_;
- private int columnCount_;
private byte[] name_;
private AbstractType comparator_;
@@ -324,13 +291,12 @@
this.comparator_ = comparator_;
}
- public ColumnIndexInfo(byte[] name, long position, int columnCount, AbstractType comparator)
+ public ColumnIndexInfo(byte[] name, long position, AbstractType comparator)
{
this(comparator);
- assert name.length == 0 || !"".equals(comparator.getString(name)); // Todo r/m length == 0 hack
+ assert name.length == 0 || !"".equals(comparator.getString(name));
name_ = name;
position_ = position;
- columnCount_ = columnCount;
}
public long position()
@@ -342,16 +308,6 @@
{
position_ = position;
}
-
- public int count()
- {
- return columnCount_;
- }
-
- public void count(int count)
- {
- columnCount_ = count;
- }
public int compareTo(ColumnIndexInfo rhs)
{
@@ -361,22 +317,20 @@
public void serialize(DataOutputStream dos) throws IOException
{
dos.writeLong(position());
- dos.writeInt(count());
ColumnSerializer.writeName(name_, dos);
}
public ColumnIndexInfo deserialize(DataInputStream dis) throws IOException
{
long position = dis.readLong();
- int columnCount = dis.readInt();
byte[] name = ColumnSerializer.readName(dis);
- return new ColumnIndexInfo(name, position, columnCount, comparator_);
+ return new ColumnIndexInfo(name, position, comparator_);
}
public int size()
{
// serialized size -- CS.writeName includes a 2-byte length prefix
- return 8 + 4 + 2 + name_.length;
+ return 8 + 2 + name_.length;
}
public byte[] name()