You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/02 00:31:16 UTC
svn commit: r799949 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ db/filter/ io/
Author: jbellis
Date: Sat Aug 1 22:31:15 2009
New Revision: 799949
URL: http://svn.apache.org/viewvc?rev=799949&view=rev
Log:
r/m SequenceFile. ColumnGroupReader moved to SSTableSliceIterator mostly unchanged for now. (finish cleaning this up in #332)
patch by jbellis; reviewed by Stu Hood for CASSANDRA-330
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Sat Aug 1 22:31:15 2009
@@ -481,7 +481,7 @@
*/
public void serialize(ColumnFamily columnFamily, DataOutput dos) throws IOException
{
- // TODO whenever we change this we need to change the code in SequenceFile to match in two places.
+ // TODO whenever we change this we need to change the code in SSTableSliceIterator to match.
// This SUCKS and is inefficient to boot. let's fix this ASAP.
Collection<IColumn> columns = columnFamily.getSortedColumns();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Sat Aug 1 22:31:15 2009
@@ -1,15 +1,17 @@
package org.apache.cassandra.db.filter;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Arrays;
+import java.util.Collections;
import java.io.IOException;
+import org.apache.commons.lang.ArrayUtils;
+
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SequenceFile;
-import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import com.google.common.collect.AbstractIterator;
@@ -25,7 +27,7 @@
private int curColumnIndex;
private ColumnFamily curCF = null;
private ArrayList<IColumn> curColumns = new ArrayList<IColumn>();
- private SequenceFile.ColumnGroupReader reader;
+ private ColumnGroupReader reader;
private AbstractType comparator;
public SSTableSliceIterator(String filename, String key, String cfName, AbstractType comparator, byte[] startColumn, boolean isAscending)
@@ -39,7 +41,7 @@
AbstractType comparator1 = DatabaseDescriptor.getComparator(ssTable.getTableName(), cfName);
long position = ssTable.getPosition(decoratedKey);
if (position >= 0)
- reader = new SequenceFile.ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position);
+ reader = new ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position);
this.comparator = comparator;
this.startColumn = startColumn;
curColumnIndex = isAscending ? 0 : -1;
@@ -127,4 +129,162 @@
{
reader.close();
}
+
+ /**
+ * This is a reader that finds the block for a starting column and returns
+ * blocks before/after it for each next call. This function assumes that
+ * the CF is sorted by name and exploits the name index.
+ */
+ public static class ColumnGroupReader
+ {
+ private String key_;
+ private String cfName_;
+ private String cfType_;
+ private AbstractType comparator_;
+ private String subComparatorName_;
+ private boolean isAscending_;
+
+ private List<IndexHelper.ColumnIndexInfo> columnIndexList_;
+ private long columnStartPosition_;
+ private int curRangeIndex_;
+ private int allColumnsSize_;
+ private int localDeletionTime_;
+ private long markedForDeleteAt_;
+ private BufferedRandomAccessFile file_;
+
+ public ColumnGroupReader(String filename, String key, String cfName, AbstractType comparator, byte[] startColumn, boolean isAscending, long position) throws IOException
+ {
+ this.file_ = new BufferedRandomAccessFile(filename, "r");
+ this.cfName_ = cfName;
+ this.comparator_ = comparator;
+ this.subComparatorName_ = DatabaseDescriptor.getSubComparator(SSTableReader.parseTableName(filename), cfName).getClass().getCanonicalName();
+ this.key_ = key;
+ this.isAscending_ = isAscending;
+ init(startColumn, position);
+ }
+
+ /**
+ * Build a list of index entries ready for search.
+ */
+ private List<IndexHelper.ColumnIndexInfo> getFullColumnIndexList(List<IndexHelper.ColumnIndexInfo> columnIndexList, int totalNumCols)
+ {
+ if (columnIndexList.size() == 0)
+ {
+ /* if there is no column index, add an index entry that covers the full space. */
+ return Arrays.asList(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, totalNumCols, comparator_));
+ }
+
+ List<IndexHelper.ColumnIndexInfo> fullColIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ int accumulatededCols = 0;
+ for (IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList)
+ accumulatededCols += colPosInfo.count();
+ int remainingCols = totalNumCols - accumulatededCols;
+
+ fullColIndexList.add(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, columnIndexList.get(0).count(), comparator_));
+ for (int i = 0; i < columnIndexList.size() - 1; i++)
+ {
+ IndexHelper.ColumnIndexInfo colPosInfo = columnIndexList.get(i);
+ fullColIndexList.add(new IndexHelper.ColumnIndexInfo(colPosInfo.name(),
+ colPosInfo.position(),
+ columnIndexList.get(i + 1).count(),
+ comparator_));
+ }
+ byte[] columnName = columnIndexList.get(columnIndexList.size() - 1).name();
+ fullColIndexList.add(new IndexHelper.ColumnIndexInfo(columnName,
+ columnIndexList.get(columnIndexList.size() - 1).position(),
+ remainingCols,
+ comparator_));
+ return fullColIndexList;
+ }
+
+ private void init(byte[] startColumn, long position) throws IOException
+ {
+ file_.seek(position);
+ String keyInDisk = file_.readUTF();
+ assert keyInDisk.equals(key_);
+
+ /* read off the size of this row */
+ int dataSize = file_.readInt();
+ /* skip the bloomfilter */
+ int totalBytesRead = IndexHelper.skipBloomFilter(file_);
+ /* read off the index flag, it has to be true */
+ boolean hasColumnIndexes = file_.readBoolean();
+ totalBytesRead += 1;
+
+ /* read the index */
+ List<IndexHelper.ColumnIndexInfo> colIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ if (hasColumnIndexes)
+ totalBytesRead += IndexHelper.deserializeIndex(SSTableReader.parseTableName(file_.getPath()), cfName_, file_, colIndexList);
+
+ /* need to do two things here.
+ * 1. move the file pointer to the beginning of the list of stored columns
+ * 2. calculate the size of all columns */
+ String cfName = file_.readUTF();
+ cfType_ = file_.readUTF();
+ String comparatorName = file_.readUTF();
+ assert comparatorName.equals(comparator_.getClass().getCanonicalName());
+ String subComparatorName = file_.readUTF(); // subcomparator
+ localDeletionTime_ = file_.readInt();
+ markedForDeleteAt_ = file_.readLong();
+ int totalNumCols = file_.readInt();
+ allColumnsSize_ = dataSize - (totalBytesRead + 4 * 2 + cfName.length() + cfType_.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
+
+ columnStartPosition_ = file_.getFilePointer();
+ columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols);
+
+ if (startColumn.length == 0 && !isAscending_)
+ {
+ /* in this case, we assume that we want to scan from the largest column in descending order. */
+ curRangeIndex_ = columnIndexList_.size() - 1;
+ }
+ else
+ {
+ int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, 0, comparator_));
+ curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index;
+ }
+ }
+
+ private boolean getBlockFromCurIndex(DataOutputBuffer bufOut) throws IOException
+ {
+ if (curRangeIndex_ < 0 || curRangeIndex_ >= columnIndexList_.size())
+ return false;
+ IndexHelper.ColumnIndexInfo curColPostion = columnIndexList_.get(curRangeIndex_);
+ long start = curColPostion.position();
+ long end = curRangeIndex_ < columnIndexList_.size() - 1
+ ? columnIndexList_.get(curRangeIndex_+1).position()
+ : allColumnsSize_;
+
+ /* seek to the correct offset to the data, and calculate the data size */
+ file_.seek(columnStartPosition_ + start);
+ long dataSize = end - start;
+
+ bufOut.reset();
+ // write CF info
+ bufOut.writeUTF(cfName_);
+ bufOut.writeUTF(cfType_);
+ bufOut.writeUTF(comparator_.getClass().getCanonicalName());
+ bufOut.writeUTF(subComparatorName_);
+ bufOut.writeInt(localDeletionTime_);
+ bufOut.writeLong(markedForDeleteAt_);
+ // now write the columns
+ bufOut.writeInt(curColPostion.count());
+ bufOut.write(file_, (int)dataSize);
+ return true;
+ }
+
+ public boolean getNextBlock(DataOutputBuffer outBuf) throws IOException
+ {
+ boolean result = getBlockFromCurIndex(outBuf);
+ if (isAscending_)
+ curRangeIndex_++;
+ else
+ curRangeIndex_--;
+ return result;
+ }
+
+ public void close() throws IOException
+ {
+ file_.close();
+ }
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java Sat Aug 1 22:31:15 2009
@@ -19,11 +19,8 @@
package org.apache.cassandra.io;
import java.io.IOException;
-import java.io.DataInput;
import java.util.Iterator;
-import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.ColumnFamily;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Sat Aug 1 22:31:15 2009
@@ -133,7 +133,7 @@
* @param columnIndexList the structure which is filled in with the deserialized index @return number of bytes read from the input
* @throws IOException
*/
- static int deserializeIndex(String tableName, String cfName, DataInput in, List<ColumnIndexInfo> columnIndexList) throws IOException
+ public static int deserializeIndex(String tableName, String cfName, DataInput in, List<ColumnIndexInfo> columnIndexList) throws IOException
{
/* read only the column index list */
int columnIndexSize = in.readInt();
@@ -343,7 +343,7 @@
position_ = position;
}
- int count()
+ public int count()
{
return columnCount_;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Sat Aug 1 22:31:15 2009
@@ -67,7 +67,7 @@
return dataFile;
}
- static String parseTableName(String filename)
+ public static String parseTableName(String filename)
{
return new File(filename).getParentFile().getName();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Sat Aug 1 22:31:15 2009
@@ -23,9 +23,7 @@
import org.apache.log4j.Logger;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.SequenceFile.ColumnGroupReader;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.service.StorageService;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java Sat Aug 1 22:31:15 2009
@@ -26,7 +26,7 @@
public SSTableWriter(String filename, int keyCount, IPartitioner partitioner) throws IOException
{
super(filename, partitioner);
- dataWriter = SequenceFile.bufferedWriter(dataFile, 4 * 1024 * 1024);
+ dataWriter = new AbstractWriter.BufferWriter(dataFile, 4 * 1024 * 1024);
indexRAF = new BufferedRandomAccessFile(indexFilename(), "rw", 1024 * 1024);
bf = new BloomFilter(keyCount, 15);
}