You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/07 18:23:53 UTC

svn commit: r802071 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ColumnIndexer.java db/filter/SSTableSliceIterator.java io/IndexHelper.java

Author: jbellis
Date: Fri Aug  7 16:23:53 2009
New Revision: 802071

URL: http://svn.apache.org/viewvc?rev=802071&view=rev
Log:
don't serialize unused column count into column index.  remove DataInput/Output round-tripping from ColumnGroupReader
patch by jbellis; reviewed by Jun Rao for CASSANDRA-332

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=802071&r1=802070&r2=802071&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Fri Aug  7 16:23:53 2009
@@ -103,7 +103,6 @@
     private static void doIndexing(AbstractType comparator, Collection<IColumn> columns, DataOutputStream dos) throws IOException
     {
         /* we are going to write column indexes */
-        int numColumns = 0;
         int position = 0;
         int indexSizeInBytes = 0;
         int sizeSummarized = 0;
@@ -118,14 +117,13 @@
         List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();        
         
         /* column offsets at the right thresholds into the index map. */
-        for ( IColumn column : columns )
+        for (IColumn column : columns)
         {
             /* if we hit the column index size that we have to index after, go ahead and index it */
-            if(position - sizeSummarized >= DatabaseDescriptor.getColumnIndexSize())
-            {      
-                IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(column.name(), 0, 0, comparator);
+            if (position - sizeSummarized >= DatabaseDescriptor.getColumnIndexSize())
+            {
+                IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(column.name(), 0, comparator);
                 cIndexInfo.position(position);
-                cIndexInfo.count(numColumns);                
                 columnIndexList.add(cIndexInfo);
                 /*
                  * we will be writing this object as a UTF8 string and two ints,
@@ -135,10 +133,8 @@
                  */
                 indexSizeInBytes += cIndexInfo.size();
                 sizeSummarized = position;
-                numColumns = 0;
             }
             position += column.serializedSize();
-            ++numColumns;
         }
         /* write the column index list */
         IndexHelper.serialize(indexSizeInBytes, columnIndexList, dos);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=802071&r1=802070&r2=802071&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Fri Aug  7 16:23:53 2009
@@ -1,9 +1,6 @@
 package org.apache.cassandra.db.filter;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.*;
 import java.io.IOException;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -22,8 +19,6 @@
 {
     protected boolean isAscending;
     private byte[] startColumn;
-    private DataOutputBuffer outBuf = new DataOutputBuffer();
-    private DataInputBuffer inBuf = new DataInputBuffer();
     private int curColumnIndex;
     private ColumnFamily curCF = null;
     private ArrayList<IColumn> curColumns = new ArrayList<IColumn>();
@@ -69,15 +64,17 @@
 
     private void getColumnsFromBuffer() throws IOException
     {
-        inBuf.reset(outBuf.getData(), outBuf.getLength());
-        ColumnFamily columnFamily = ColumnFamily.serializer().deserialize(inBuf);
-
         if (curCF == null)
-            curCF = columnFamily.cloneMeShallow();
+            curCF = reader.getEmptyColumnFamily().cloneMeShallow();
         curColumns.clear();
-        for (IColumn column : columnFamily.getSortedColumns())
+        while (true)
+        {
+            IColumn column = reader.pollColumn();
+            if (column == null)
+                break;
             if (isColumnNeeded(column))
                 curColumns.add(column);
+        }
 
         if (isAscending)
             curColumnIndex = 0;
@@ -114,7 +111,7 @@
 
             try
             {
-                if (!reader.getNextBlock(outBuf))
+                if (!reader.getNextBlock())
                     return endOfData();
                 getColumnsFromBuffer();
             }
@@ -140,25 +137,22 @@
     {
         private String key_;
         private String cfName_;
-        private String cfType_;
         private AbstractType comparator_;
-        private String subComparatorName_;
         private boolean isAscending_;
+        private ColumnFamily emptyColumnFamily;
 
         private List<IndexHelper.ColumnIndexInfo> columnIndexList_;
         private long columnStartPosition_;
         private int curRangeIndex_;
         private int allColumnsSize_;
-        private int localDeletionTime_;
-        private long markedForDeleteAt_;
         private BufferedRandomAccessFile file_;
+        private Queue<IColumn> blockColumns = new ArrayDeque<IColumn>();
 
         public ColumnGroupReader(String filename, String key, String cfName, AbstractType comparator, byte[] startColumn, boolean isAscending, long position) throws IOException
         {
             this.file_ = new BufferedRandomAccessFile(filename, "r");
             this.cfName_ = cfName;
             this.comparator_ = comparator;
-            this.subComparatorName_ = DatabaseDescriptor.getSubComparator(SSTableReader.parseTableName(filename), cfName).getClass().getCanonicalName();
             this.key_ = key;
             this.isAscending_ = isAscending;
             init(startColumn, position);
@@ -167,34 +161,24 @@
         /**
          *   Build a list of index entries ready for search.
          */
-        private List<IndexHelper.ColumnIndexInfo> getFullColumnIndexList(List<IndexHelper.ColumnIndexInfo> columnIndexList, int totalColumns)
+        private List<IndexHelper.ColumnIndexInfo> getFullColumnIndexList(List<IndexHelper.ColumnIndexInfo> columnIndexList)
         {
             if (columnIndexList.size() == 0)
             {
                 /* if there is no column index, add an index entry that covers the full space. */
-                return Arrays.asList(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, totalColumns, comparator_));
+                return Arrays.asList(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, comparator_));
             }
 
             List<IndexHelper.ColumnIndexInfo> fullColIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
-            int accumulatedColumns = 0;
-            for (IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList)
-                accumulatedColumns += colPosInfo.count();
-            int remainingCols = totalColumns - accumulatedColumns;
 
-            fullColIndexList.add(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, columnIndexList.get(0).count(), comparator_));
+            fullColIndexList.add(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, comparator_));
             for (int i = 0; i < columnIndexList.size() - 1; i++)
             {
                 IndexHelper.ColumnIndexInfo colPosInfo = columnIndexList.get(i);
-                fullColIndexList.add(new IndexHelper.ColumnIndexInfo(colPosInfo.name(),
-                                                                     colPosInfo.position(),
-                                                                     columnIndexList.get(i + 1).count(),
-                                                                     comparator_));
+                fullColIndexList.add(new IndexHelper.ColumnIndexInfo(colPosInfo.name(), colPosInfo.position(), comparator_));
             }
             byte[] columnName = columnIndexList.get(columnIndexList.size() - 1).name();
-            fullColIndexList.add(new IndexHelper.ColumnIndexInfo(columnName,
-                                                                 columnIndexList.get(columnIndexList.size() - 1).position(),
-                                                                 remainingCols,
-                                                                 comparator_));
+            fullColIndexList.add(new IndexHelper.ColumnIndexInfo(columnName, columnIndexList.get(columnIndexList.size() - 1).position(), comparator_));
             return fullColIndexList;
         }
 
@@ -220,18 +204,13 @@
             /* need to do two things here.
              * 1. move the file pointer to the beginning of the list of stored columns
              * 2. calculate the size of all columns */
-            String cfName = file_.readUTF();
-            cfType_ = file_.readUTF();
-            String comparatorName = file_.readUTF();
-            assert comparatorName.equals(comparator_.getClass().getCanonicalName());
-            String subComparatorName = file_.readUTF(); // subcomparator
-            localDeletionTime_ = file_.readInt();
-            markedForDeleteAt_ = file_.readLong();
+            emptyColumnFamily = ColumnFamily.serializer().deserializeEmpty(file_);
             int totalNumCols = file_.readInt();
-            allColumnsSize_ = dataSize - (totalBytesRead + 4 * 2 + cfName.length() + cfType_.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
+            totalBytesRead += emptyColumnFamily.serializedSize();
+            allColumnsSize_ = dataSize - totalBytesRead;
 
             columnStartPosition_ = file_.getFilePointer();
-            columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols);
+            columnIndexList_ = getFullColumnIndexList(colIndexList);
 
             if (startColumn.length == 0 && !isAscending_)
             {
@@ -240,12 +219,22 @@
             }
             else
             {
-                int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, 0, comparator_));
+                int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, comparator_));
                 curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index;
             }
         }
 
-        private boolean getBlockFromCurIndex(DataOutputBuffer bufOut) throws IOException
+        public ColumnFamily getEmptyColumnFamily()
+        {
+            return emptyColumnFamily;
+        }
+
+        public IColumn pollColumn()
+        {
+            return blockColumns.poll();
+        }
+
+        private boolean getBlockFromCurIndex() throws IOException
         {
             if (curRangeIndex_ < 0 || curRangeIndex_ >= columnIndexList_.size())
                 return false;
@@ -257,25 +246,16 @@
 
             /* seek to the correct offset to the data, and calculate the data size */
             file_.seek(columnStartPosition_ + start);
-            long dataSize = end - start;
-
-            bufOut.reset();
-            // write CF info
-            bufOut.writeUTF(cfName_);
-            bufOut.writeUTF(cfType_);
-            bufOut.writeUTF(comparator_.getClass().getCanonicalName());
-            bufOut.writeUTF(subComparatorName_);
-            bufOut.writeInt(localDeletionTime_);
-            bufOut.writeLong(markedForDeleteAt_);
-            // now write the columns
-            bufOut.writeInt(curColPostion.count());
-            bufOut.write(file_, (int)dataSize);
+            while (file_.getFilePointer() < columnStartPosition_ + end)
+            {
+                blockColumns.add(emptyColumnFamily.getColumnSerializer().deserialize(file_));
+            }
             return true;
         }
 
-        public boolean getNextBlock(DataOutputBuffer outBuf) throws IOException
+        public boolean getNextBlock() throws IOException
         {
-            boolean result = getBlockFromCurIndex(outBuf);
+            boolean result = getBlockFromCurIndex();
             if (isAscending_)
                 curRangeIndex_++;
             else

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=802071&r1=802070&r2=802071&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Fri Aug  7 16:23:53 2009
@@ -171,46 +171,21 @@
      */
 	static ColumnRange getColumnRangeFromNameIndex(IndexHelper.ColumnIndexInfo cIndexInfo, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
 	{
-		/* find the offset for the column */
-        int size = columnIndexList.size();
-        long start = 0;
-        long end = dataSize;
-        int numColumns = 0;      
-       
-        int index = Collections.binarySearch(columnIndexList, cIndexInfo);
-        if ( index < 0 )
-        {
-            /* We are here which means that the requested column is not an index. */
-            index = (++index)*(-1);
-        }
-        else
-        {
-        	++index;
-        }
-
-        /* calculate the starting offset from which we have to read */
-        start = (index == 0) ? 0 : columnIndexList.get(index - 1).position();
+        // TODO this looks like it can be simplified
+        int rawIndex = Collections.binarySearch(columnIndexList, cIndexInfo);
+        int index = rawIndex < 0
+                  ? -1 * (rawIndex + 1)
+                  : rawIndex + 1;
+        if (index > 0)
+            index -= 1;
+        assert index < columnIndexList.size();
 
-        if( index < size )
-        {
-        	end = columnIndexList.get(index).position();
-            numColumns = columnIndexList.get(index).count();            
-        }
-        else
-        {
-        	end = dataSize;  
-            int totalColsIndexed = 0;
-            for( IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList )
-            {
-                totalColsIndexed += colPosInfo.count();
-            }
-            numColumns = totalNumCols - totalColsIndexed;
-        }
+        long blockStart = columnIndexList.get(index).position();
 
-        return new ColumnRange(start, end, numColumns);
+        return new ColumnRange(blockStart, blockStart);
 	}
 
-	/**
+    /**
 	 * Returns the sub-ranges that contain the list of columns in columnNames.
 	 * @param columnNames The list of columns whose subranges need to be found
 	 * @param columnIndexList the deserialized column indexes
@@ -224,14 +199,14 @@
 
         if (columnIndexList.size() == 0)
         {
-            columnRanges.add(new ColumnRange(0, dataSize, totalNumCols));
+            columnRanges.add(new ColumnRange(0, dataSize));
         }
         else
         {
             Map<Long, Boolean> offset = new HashMap<Long, Boolean>();
             for (byte[] name : columnNames)
             {
-                IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(name, 0, 0, (AbstractType)columnNames.comparator());
+                IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(name, 0, (AbstractType)columnNames.comparator());
                 ColumnRange columnRange = getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, totalNumCols);
                 if (offset.get(columnRange.coordinate().start_) == null)
                 {
@@ -289,23 +264,16 @@
     public static class ColumnRange
     {
         private Coordinate coordinate_;
-        private int columnCount_;
-        
-        ColumnRange(long start, long end, int columnCount)
+
+        ColumnRange(long start, long end)
         {
             coordinate_ = new Coordinate(start, end);
-            columnCount_ = columnCount;
         }
         
         public Coordinate coordinate()
         {
             return coordinate_;
         }
-        
-        public int count()
-        {
-            return columnCount_;
-        }                
     }
 
     /**
@@ -315,7 +283,6 @@
     public static class ColumnIndexInfo implements Comparable<ColumnIndexInfo>
     {
         private long position_;
-        private int columnCount_;
         private byte[] name_;
         private AbstractType comparator_;
 
@@ -324,13 +291,12 @@
             this.comparator_ = comparator_;
         }
 
-        public ColumnIndexInfo(byte[] name, long position, int columnCount, AbstractType comparator)
+        public ColumnIndexInfo(byte[] name, long position, AbstractType comparator)
         {
             this(comparator);
-            assert name.length == 0 || !"".equals(comparator.getString(name)); // Todo r/m length == 0 hack
+            assert name.length == 0 || !"".equals(comparator.getString(name));
             name_ = name;
             position_ = position;
-            columnCount_ = columnCount;
         }
                 
         public long position()
@@ -342,16 +308,6 @@
         {
             position_ = position;
         }
-        
-        public int count()
-        {
-            return columnCount_;
-        }
-        
-        public void count(int count)
-        {
-            columnCount_ = count;
-        }
 
         public int compareTo(ColumnIndexInfo rhs)
         {
@@ -361,22 +317,20 @@
         public void serialize(DataOutputStream dos) throws IOException
         {
             dos.writeLong(position());
-            dos.writeInt(count());
             ColumnSerializer.writeName(name_, dos);
         }
 
         public ColumnIndexInfo deserialize(DataInputStream dis) throws IOException
         {
             long position = dis.readLong();
-            int columnCount = dis.readInt();
             byte[] name = ColumnSerializer.readName(dis);
-            return new ColumnIndexInfo(name, position, columnCount, comparator_);
+            return new ColumnIndexInfo(name, position, comparator_);
         }
 
         public int size()
         {
             // serialized size -- CS.writeName includes a 2-byte length prefix
-            return 8 + 4 + 2 + name_.length;
+            return 8 + 2 + name_.length;
         }
 
         public byte[] name()