You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/02 00:31:08 UTC

svn commit: r799948 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ColumnFamily.java db/filter/SSTableNamesIterator.java io/IFileReader.java io/IndexHelper.java io/SequenceFile.java

Author: jbellis
Date: Sat Aug  1 22:31:08 2009
New Revision: 799948

URL: http://svn.apache.org/viewvc?rev=799948&view=rev
Log:
replace SF.next with code in NamesIterator that doesn't rewrite data through an extra layer of DataOutput/Input
patch by jbellis; reviewed by Stu Hood for CASSANDRA-330

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=799948&r1=799947&r2=799948&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Sat Aug  1 22:31:08 2009
@@ -411,6 +411,27 @@
         return type_;
     }
 
+    private String getComparatorName()
+    {
+        return getComparator().getClass().getCanonicalName();
+    }
+
+    private String getSubComparatorName()
+    {
+        AbstractType subcolumnComparator = getSubComparator();
+        return subcolumnComparator == null ? "" : subcolumnComparator.getClass().getCanonicalName();
+    }
+
+    public int serializedSize()
+    {
+        int subtotal = 4 * IColumn.UtfPrefix_ + name_.length() + type_.length() +  getComparatorName().length() + getSubComparatorName().length() + 4 + 8 + 4;
+        for (IColumn column : columns_.values())
+        {
+            subtotal += column.serializedSize();
+        }
+        return subtotal;
+    }
+
     /** merge all columnFamilies into a single instance, with only the newest versions of columns preserved. */
     static ColumnFamily resolve(List<ColumnFamily> columnFamilies)
     {
@@ -466,9 +487,8 @@
 
             dos.writeUTF(columnFamily.name());
             dos.writeUTF(columnFamily.type_);
-            dos.writeUTF(columnFamily.getComparator().getClass().getCanonicalName());
-            AbstractType subcolumnComparator = columnFamily.getSubComparator();
-            dos.writeUTF(subcolumnComparator == null ? "" : subcolumnComparator.getClass().getCanonicalName());
+            dos.writeUTF(columnFamily.getComparatorName());
+            dos.writeUTF(columnFamily.getSubComparatorName());
             dos.writeInt(columnFamily.localDeletionTime);
             dos.writeLong(columnFamily.markedForDeleteAt);
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=799948&r1=799947&r2=799948&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java Sat Aug  1 22:31:08 2009
@@ -3,6 +3,8 @@
 import java.io.IOException;
 import java.util.SortedSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
@@ -14,45 +16,71 @@
     private Iterator<IColumn> iter;
     public final SortedSet<byte[]> columns;
 
-    // TODO make this actually iterate so we don't have to read + deserialize + filter data that we don't need, only to skip it later in computeNext
     public SSTableNamesIterator(String filename, String key, String cfName, SortedSet<byte[]> columns) throws IOException
     {
+        assert columns != null;
         this.columns = columns;
         SSTableReader ssTable = SSTableReader.open(filename);
 
-        IFileReader dataReader = null;
-        DataOutputBuffer bufOut = new DataOutputBuffer();
-        DataInputBuffer bufIn = new DataInputBuffer();
+        String decoratedKey = ssTable.getPartitioner().decorateKey(key);
+        long position = ssTable.getPosition(decoratedKey);
+        if (position < 0)
+            return;
 
+        BufferedRandomAccessFile file = new BufferedRandomAccessFile(filename, "r");
         try
         {
-            dataReader = SequenceFile.bufferedReader(ssTable.getFilename(), 64 * 1024);
-            String decoratedKey = ssTable.getPartitioner().decorateKey(key);
-            long position = ssTable.getPosition(decoratedKey);
-            if (position >= 0)
+            file.seek(position);
+
+            /* note the position where the key starts */
+            String keyInDisk = file.readUTF();
+            assert keyInDisk.equals(decoratedKey) : keyInDisk;
+            int dataSize = file.readInt();
+
+            /* Read the bloom filter summarizing the columns */
+            long preBfPos = file.getFilePointer();
+            IndexHelper.defreezeBloomFilter(file);
+            long postBfPos = file.getFilePointer();
+            dataSize -= (postBfPos - preBfPos);
+
+            List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+            dataSize -= IndexHelper.readColumnIndexes(file, ssTable.getTableName(), cfName, columnIndexList);
+
+            cf = ColumnFamily.serializer().deserializeEmpty(file);
+            int totalColumns = file.readInt();
+            dataSize -= cf.serializedSize();
+
+            /* get the various column ranges we have to read */
+            List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(columns, columnIndexList, dataSize, totalColumns);
+
+            int prevPosition = 0;
+            /* now read all the columns from the ranges */
+            for (IndexHelper.ColumnRange columnRange : columnRanges)
             {
-                long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columns, position);
-                assert bytesRead > 0;
-                assert bufOut.getLength() > 0;
-                bufIn.reset(bufOut.getData(), bufOut.getLength());
-                /* read the key even though we do not use it */
-                bufIn.readUTF();
-                bufIn.readInt();
+                /* seek to the correct offset to the data */
+                long columnBegin = file.getFilePointer();
+                Coordinate coordinate = columnRange.coordinate();
+                file.skipBytes((int)(coordinate.start_ - prevPosition));
+                // read the columns in this range
+                // TODO only completely deserialize columns we are interested in
+                while (file.getFilePointer() - columnBegin < coordinate.end_ - coordinate.start_)
+                {
+                    final IColumn column = cf.getColumnSerializer().deserialize(file);
+                    if (columns.contains(column.name()))
+                    {
+                        cf.addColumn(column);
+                    }
+                }
+
+                prevPosition = (int) coordinate.end_;
             }
         }
         finally
         {
-            if (dataReader != null)
-            {
-                dataReader.close();
-            }
+            file.close();
         }
 
-        if (bufIn.getLength() > 0)
-        {
-            cf = ColumnFamily.serializer().deserialize(bufIn);
-            iter = cf.getSortedColumns().iterator();
-        }
+        iter = cf.getSortedColumns().iterator();
     }
 
     public ColumnFamily getColumnFamily()
@@ -62,14 +90,8 @@
 
     protected IColumn computeNext()
     {
-        if (iter == null)
+        if (iter == null || !iter.hasNext())
             return endOfData();
-        while (iter.hasNext())
-        {
-            IColumn c = iter.next();
-            if (columns.contains(c.name()))
-                return c;
-        }
-        return endOfData();
+        return iter.next();
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java?rev=799948&r1=799947&r2=799948&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java Sat Aug  1 22:31:08 2009
@@ -36,19 +36,6 @@
     public boolean isEOF() throws IOException;
 
     /**
-     * This method dumps the next key/value into the DataOuputStream
-     * passed in. Always use this method to query for application
-     * specific data as it will have indexes.
-     *
-     * @param key - key we are interested in.
-     * @param bufOut - DataOutputStream that needs to be filled.
-     * @param columnFamilyName The name of the column family only without the ":"
-     * @param columnNames - The list of columns in the cfName column family
-     * 					     that we want to return
-    */
-    public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> columnNames, long position) throws IOException;
-
-    /**
      * Close the file after reading.
      * @throws IOException
      */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=799948&r1=799947&r2=799948&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Sat Aug  1 22:31:08 2009
@@ -218,7 +218,7 @@
 	 * @param totalNumCols the total number of columns
 	 * @return a list of subranges which contain all the columns in columnNames
 	 */
-	static List<ColumnRange> getMultiColumnRangesFromNameIndex(SortedSet<byte[]> columnNames, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
+	public static List<ColumnRange> getMultiColumnRangesFromNameIndex(SortedSet<byte[]> columnNames, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
 	{
 		List<ColumnRange> columnRanges = new ArrayList<ColumnRange>();
 
@@ -248,7 +248,7 @@
          * Reads the column name indexes if present. If the
      * indexes are based on time then skip over them.
      */
-    static int readColumnIndexes(RandomAccessFile file, String tableName, String cfName, List<ColumnIndexInfo> columnIndexList) throws IOException
+    public static int readColumnIndexes(RandomAccessFile file, String tableName, String cfName, List<ColumnIndexInfo> columnIndexList) throws IOException
     {
         /* check if we have an index */
         boolean hasColumnIndexes = file.readBoolean();
@@ -268,7 +268,7 @@
      * @return bloom filter summarizing the column information
      * @throws java.io.IOException
      */
-    static BloomFilter defreezeBloomFilter(RandomAccessFile file) throws IOException
+    public static BloomFilter defreezeBloomFilter(RandomAccessFile file) throws IOException
     {
         int size = file.readInt();
         byte[] bytes = new byte[size];
@@ -297,12 +297,12 @@
             columnCount_ = columnCount;
         }
         
-        Coordinate coordinate()
+        public Coordinate coordinate()
         {
             return coordinate_;
         }
         
-        int count()
+        public int count()
         {
             return columnCount_;
         }                

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=799948&r1=799947&r2=799948&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Sat Aug  1 22:31:08 2009
@@ -191,7 +191,6 @@
 
     public static abstract class AbstractReader implements IFileReader
     {
-        private static final short utfPrefix_ = 2;
         protected RandomAccessFile file_;
         protected String filename_;
 
@@ -209,89 +208,6 @@
         {
             return filename_;
         }
-
-        /**
-         * This method dumps the next key/value into the DataOuputStream
-         * passed in. Always use this method to query for application
-         * specific data as it will have indexes.
-         *
-         * @param key       key we are interested in.
-         * @param bufOut    DataOutputStream that needs to be filled.
-         * @param columnFamilyName name of the columnFamily
-         * @param columnNames columnNames we are interested in
-         */
-        public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> columnNames, long position) throws IOException
-        {
-            assert columnNames != null;
-            seek(position);
-
-            /* note the position where the key starts */
-            long startPosition = file_.getFilePointer();
-            String keyInDisk = file_.readUTF();
-            assert keyInDisk.equals(key);
-            int dataSize = file_.readInt();
-
-            /* write the key into buffer */
-            bufOut.writeUTF(key);
-
-            /* Read the bloom filter summarizing the columns */
-            long preBfPos = file_.getFilePointer();
-            IndexHelper.defreezeBloomFilter(file_);
-            long postBfPos = file_.getFilePointer();
-            dataSize -= (postBfPos - preBfPos);
-
-            List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
-            dataSize -= IndexHelper.readColumnIndexes(file_, getTableName(), columnFamilyName, columnIndexList);
-
-            // read CF data so we can echo it back to the outstream
-            String cfName = file_.readUTF();
-            String cfType = file_.readUTF();
-            String comparatorName = file_.readUTF();
-            String subComparatorName = file_.readUTF();
-            int localDeletionTime = file_.readInt();
-            long markedForDeleteAt = file_.readLong();
-            int totalColumns = file_.readInt();
-            dataSize -= (4 * utfPrefix_ + cfName.length() + cfType.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
-
-            /* get the various column ranges we have to read */
-            List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(columnNames, columnIndexList, dataSize, totalColumns);
-
-            /* calculate the data size */
-            int numColsReturned = 0;
-            int dataSizeReturned = 0;
-            for (IndexHelper.ColumnRange columnRange : columnRanges)
-            {
-                numColsReturned += columnRange.count();
-                Coordinate coordinate = columnRange.coordinate();
-                dataSizeReturned += coordinate.end_ - coordinate.start_;
-            }
-
-            // returned data size
-            bufOut.writeInt(dataSizeReturned + utfPrefix_ * 4 + cfName.length() + cfType.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
-            // echo back the CF data we read
-            bufOut.writeUTF(cfName);
-            bufOut.writeUTF(cfType);
-            bufOut.writeUTF(comparatorName);
-            bufOut.writeUTF(subComparatorName);
-            bufOut.writeInt(localDeletionTime);
-            bufOut.writeLong(markedForDeleteAt);
-            /* write number of columns */
-            bufOut.writeInt(numColsReturned);
-            int prevPosition = 0;
-            /* now write all the columns we are required to write */
-            for (IndexHelper.ColumnRange columnRange : columnRanges)
-            {
-                /* seek to the correct offset to the data */
-                Coordinate coordinate = columnRange.coordinate();
-                file_.skipBytes((int) (coordinate.start_ - prevPosition));
-                bufOut.write(file_, (int) (coordinate.end_ - coordinate.start_));
-                prevPosition = (int) coordinate.end_;
-            }
-
-            long endPosition = file_.getFilePointer();
-            return endPosition - startPosition;
-        }
-
     }
 
     public static class Reader extends AbstractReader