You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/02 00:31:01 UTC

svn commit: r799947 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/filter/SSTableSliceIterator.java io/IndexHelper.java io/SequenceFile.java

Author: jbellis
Date: Sat Aug  1 22:31:01 2009
New Revision: 799947

URL: http://svn.apache.org/viewvc?rev=799947&view=rev
Log:
SF shouldn't duplicate position checking that was already done by SSTable.  move utility methods to IndexHelper.  fix off-by-4 in dataSizeReturned.
patch by jbellis; reviewed by Stu Hood for CASSANDRA-330

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=799947&r1=799946&r2=799947&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Sat Aug  1 22:31:01 2009
@@ -36,9 +36,10 @@
 
         /* Morph key into actual key based on the partition type. */
         String decoratedKey = ssTable.getPartitioner().decorateKey(key);
-        long position = ssTable.getPosition(decoratedKey);
         AbstractType comparator1 = DatabaseDescriptor.getComparator(ssTable.getTableName(), cfName);
-        reader = new SequenceFile.ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position);
+        long position = ssTable.getPosition(decoratedKey);
+        if (position >= 0)
+            reader = new SequenceFile.ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position);
         this.comparator = comparator;
         this.startColumn = startColumn;
         curColumnIndex = isAscending ? 0 : -1;
@@ -89,6 +90,9 @@
 
     protected IColumn computeNext()
     {
+        if (reader == null)
+            return endOfData();
+
         while (true)
         {
             if (isAscending)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=799947&r1=799946&r2=799947&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Sat Aug  1 22:31:01 2009
@@ -18,15 +18,13 @@
 
 package org.apache.cassandra.io;
 
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
 import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnSerializer;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.BloomFilter;
 
 
 /**
@@ -246,6 +244,40 @@
         return columnRanges;
 	}
 
+    /**
+         * Reads the column name indexes if present. If the
+     * indexes are based on time then skip over them.
+     */
+    static int readColumnIndexes(RandomAccessFile file, String tableName, String cfName, List<ColumnIndexInfo> columnIndexList) throws IOException
+    {
+        /* check if we have an index */
+        boolean hasColumnIndexes = file.readBoolean();
+        int totalBytesRead = 1;
+        /* if we do then deserialize the index */
+        if (hasColumnIndexes)
+        {
+            /* read the index */
+            totalBytesRead += deserializeIndex(tableName, cfName, file, columnIndexList);
+        }
+        return totalBytesRead;
+    }
+
+    /**
+         * Defreeze the bloom filter.
+     *
+     * @return bloom filter summarizing the column information
+     * @throws java.io.IOException
+     */
+    static BloomFilter defreezeBloomFilter(RandomAccessFile file) throws IOException
+    {
+        int size = file.readInt();
+        byte[] bytes = new byte[size];
+        file.readFully(bytes);
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bytes, bytes.length);
+        return BloomFilter.serializer().deserialize(bufIn);
+    }
+
 
     /**
      * A column range containing the start and end

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=799947&r1=799946&r2=799947&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Sat Aug  1 22:31:01 2009
@@ -22,7 +22,6 @@
 import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 
 import org.apache.log4j.Logger;
@@ -106,57 +105,48 @@
 
         private void init(byte[] startColumn, long position) throws IOException
         {
-            String keyInDisk = null;
-            if (seekTo(position) >= 0)
-                keyInDisk = file_.readUTF();
+            seek(position);
+            String keyInDisk = file_.readUTF();
+            assert keyInDisk.equals(key_);
+
+            /* read off the size of this row */
+            int dataSize = file_.readInt();
+            /* skip the bloomfilter */
+            int totalBytesRead = IndexHelper.skipBloomFilter(file_);
+            /* read off the index flag, it has to be true */
+            boolean hasColumnIndexes = file_.readBoolean();
+            totalBytesRead += 1;
+
+            /* read the index */
+            List<IndexHelper.ColumnIndexInfo> colIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+            if (hasColumnIndexes)
+                totalBytesRead += IndexHelper.deserializeIndex(getTableName(), cfName_, file_, colIndexList);
 
-            if ( keyInDisk != null && keyInDisk.equals(key_))
+            /* need to do two things here.
+             * 1. move the file pointer to the beginning of the list of stored columns
+             * 2. calculate the size of all columns */
+            String cfName = file_.readUTF();
+            cfType_ = file_.readUTF();
+            String comparatorName = file_.readUTF();
+            assert comparatorName.equals(comparator_.getClass().getCanonicalName());
+            String subComparatorName = file_.readUTF(); // subcomparator
+            localDeletionTime_ = file_.readInt();
+            markedForDeleteAt_ = file_.readLong();
+            int totalNumCols = file_.readInt();
+            allColumnsSize_ = dataSize - (totalBytesRead + 4 * utfPrefix_ + cfName.length() + cfType_.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
+
+            columnStartPosition_ = file_.getFilePointer();
+            columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols);
+
+            if (startColumn.length == 0 && !isAscending_)
             {
-                /* read off the size of this row */
-                int dataSize = file_.readInt();
-                /* skip the bloomfilter */
-                int totalBytesRead = IndexHelper.skipBloomFilter(file_);
-                /* read off the index flag, it has to be true */
-                boolean hasColumnIndexes = file_.readBoolean();
-                totalBytesRead += 1;
-
-                /* read the index */
-                List<IndexHelper.ColumnIndexInfo> colIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
-                if (hasColumnIndexes)
-                    totalBytesRead += IndexHelper.deserializeIndex(getTableName(), cfName_, file_, colIndexList);
-
-                /* need to do two things here.
-                 * 1. move the file pointer to the beginning of the list of stored columns
-                 * 2. calculate the size of all columns */
-                String cfName = file_.readUTF();
-                cfType_ = file_.readUTF();
-                String comparatorName = file_.readUTF();
-                assert comparatorName.equals(comparator_.getClass().getCanonicalName());
-                String subComparatorName = file_.readUTF(); // subcomparator
-                localDeletionTime_ = file_.readInt();
-                markedForDeleteAt_ = file_.readLong();
-                int totalNumCols = file_.readInt();
-                allColumnsSize_ = dataSize - (totalBytesRead + 4 * utfPrefix_ + cfName.length() + cfType_.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
-
-                columnStartPosition_ = file_.getFilePointer();
-                columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols);
-
-                if (startColumn.length == 0 && !isAscending_)
-                {
-                    /* in this case, we assume that we want to scan from the largest column in descending order. */
-                    curRangeIndex_ = columnIndexList_.size() - 1;
-                }
-                else
-                {
-                    int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, 0, comparator_));
-                    curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index;
-                }
+                /* in this case, we assume that we want to scan from the largest column in descending order. */
+                curRangeIndex_ = columnIndexList_.size() - 1;
             }
             else
             {
-                /* no keys found in this file because of a false positive in BF */
-                curRangeIndex_ = -1;
-                columnIndexList_ = new ArrayList<IndexHelper.ColumnIndexInfo>();
+                int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, 0, comparator_));
+                curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index;
             }
         }
 
@@ -220,52 +210,6 @@
             return filename_;
         }
 
-        long seekTo(long position) throws IOException
-        {
-            if (position >= 0)
-                seek(position);
-            return position;
-        }
-
-        /**
-         * Defreeze the bloom filter.
-         *
-         * @return bloom filter summarizing the column information
-         * @throws IOException
-         */
-        private BloomFilter defreezeBloomFilter() throws IOException
-        {
-            int size = file_.readInt();
-            byte[] bytes = new byte[size];
-            file_.readFully(bytes);
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(bytes, bytes.length);
-            BloomFilter bf = BloomFilter.serializer().deserialize(bufIn);
-            return bf;
-        }
-
-        /**
-         * Reads the column name indexes if present. If the
-         * indexes are based on time then skip over them.
-         *
-         * @param cfName
-         * @return
-         */
-        private int handleColumnNameIndexes(String cfName, List<IndexHelper.ColumnIndexInfo> columnIndexList) throws IOException
-        {
-            /* check if we have an index */
-            boolean hasColumnIndexes = file_.readBoolean();
-            int totalBytesRead = 1;
-            /* if we do then deserialize the index */
-            if (hasColumnIndexes)
-            {
-                String tableName = getTableName();
-                /* read the index */
-                totalBytesRead += IndexHelper.deserializeIndex(tableName, cfName, file_, columnIndexList);
-            }
-            return totalBytesRead;
-        }
-
         /**
          * This method dumps the next key/value into the DataOuputStream
          * passed in. Always use this method to query for application
@@ -279,26 +223,12 @@
         public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> columnNames, long position) throws IOException
         {
             assert columnNames != null;
-
-            long bytesRead = -1L;
-            if (isEOF() || seekTo(position) < 0)
-                return bytesRead;
+            seek(position);
 
             /* note the position where the key starts */
             long startPosition = file_.getFilePointer();
             String keyInDisk = file_.readUTF();
             assert keyInDisk.equals(key);
-            readColumns(key, bufOut, columnFamilyName, columnNames);
-
-            long endPosition = file_.getFilePointer();
-            bytesRead = endPosition - startPosition;
-
-            return bytesRead;
-        }
-
-        private void readColumns(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> cNames)
-        throws IOException
-        {
             int dataSize = file_.readInt();
 
             /* write the key into buffer */
@@ -306,42 +236,25 @@
 
             /* Read the bloom filter summarizing the columns */
             long preBfPos = file_.getFilePointer();
-            BloomFilter bf = defreezeBloomFilter();
+            IndexHelper.defreezeBloomFilter(file_);
             long postBfPos = file_.getFilePointer();
             dataSize -= (postBfPos - preBfPos);
 
             List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
-            /* read the column name indexes if present */
-            int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
-            dataSize -= totalBytesRead;
+            dataSize -= IndexHelper.readColumnIndexes(file_, getTableName(), columnFamilyName, columnIndexList);
 
-            /* read the column family name */
+            // read CF data so we can echo it back to the outstream
             String cfName = file_.readUTF();
-            dataSize -= (utfPrefix_ + cfName.length());
-
             String cfType = file_.readUTF();
-            dataSize -= (utfPrefix_ + cfType.length());
-
             String comparatorName = file_.readUTF();
-            dataSize -= (utfPrefix_ + comparatorName.length());
-
             String subComparatorName = file_.readUTF();
-            dataSize -= (utfPrefix_ + subComparatorName.length());
-
-            /* read local deletion time */
             int localDeletionTime = file_.readInt();
-            dataSize -=4;
-
-            /* read if this cf is marked for delete */
             long markedForDeleteAt = file_.readLong();
-            dataSize -= 8;
-
-            /* read the total number of columns */
-            int totalNumCols = file_.readInt();
-            dataSize -= 4;
+            int totalColumns = file_.readInt();
+            dataSize -= (4 * utfPrefix_ + cfName.length() + cfType.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
 
             /* get the various column ranges we have to read */
-            List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(cNames, columnIndexList, dataSize, totalNumCols);
+            List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(columnNames, columnIndexList, dataSize, totalColumns);
 
             /* calculate the data size */
             int numColsReturned = 0;
@@ -354,7 +267,7 @@
             }
 
             // returned data size
-            bufOut.writeInt(dataSizeReturned + utfPrefix_ * 4 + cfName.length() + cfType.length() + comparatorName.length() + subComparatorName.length() + 4 + 4 + 8 + 4);
+            bufOut.writeInt(dataSizeReturned + utfPrefix_ * 4 + cfName.length() + cfType.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
             // echo back the CF data we read
             bufOut.writeUTF(cfName);
             bufOut.writeUTF(cfType);
@@ -374,7 +287,11 @@
                 bufOut.write(file_, (int) (coordinate.end_ - coordinate.start_));
                 prevPosition = (int) coordinate.end_;
             }
+
+            long endPosition = file_.getFilePointer();
+            return endPosition - startPosition;
         }
+
     }
 
     public static class Reader extends AbstractReader