You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/02 00:31:16 UTC

svn commit: r799949 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ db/filter/ io/

Author: jbellis
Date: Sat Aug  1 22:31:15 2009
New Revision: 799949

URL: http://svn.apache.org/viewvc?rev=799949&view=rev
Log:
r/m SequenceFile. ColumnGroupReader moved to SSTableSliceIterator mostly unchanged for now. (finish cleaning this up in #332)
patch by jbellis; reviewed by Stu Hood for CASSANDRA-330

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Sat Aug  1 22:31:15 2009
@@ -481,7 +481,7 @@
         */
         public void serialize(ColumnFamily columnFamily, DataOutput dos) throws IOException
         {
-            // TODO whenever we change this we need to change the code in SequenceFile to match in two places.
+            // TODO whenever we change this we need to change the code in SSTableSliceIterator to match.
             // This SUCKS and is inefficient to boot.  let's fix this ASAP. 
             Collection<IColumn> columns = columnFamily.getSortedColumns();
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Sat Aug  1 22:31:15 2009
@@ -1,15 +1,17 @@
 package org.apache.cassandra.db.filter;
 
 import java.util.ArrayList;
+import java.util.List;
+import java.util.Arrays;
+import java.util.Collections;
 import java.io.IOException;
 
+import org.apache.commons.lang.ArrayUtils;
+
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SequenceFile;
-import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import com.google.common.collect.AbstractIterator;
 
@@ -25,7 +27,7 @@
     private int curColumnIndex;
     private ColumnFamily curCF = null;
     private ArrayList<IColumn> curColumns = new ArrayList<IColumn>();
-    private SequenceFile.ColumnGroupReader reader;
+    private ColumnGroupReader reader;
     private AbstractType comparator;
 
     public SSTableSliceIterator(String filename, String key, String cfName, AbstractType comparator, byte[] startColumn, boolean isAscending)
@@ -39,7 +41,7 @@
         AbstractType comparator1 = DatabaseDescriptor.getComparator(ssTable.getTableName(), cfName);
         long position = ssTable.getPosition(decoratedKey);
         if (position >= 0)
-            reader = new SequenceFile.ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position);
+            reader = new ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position);
         this.comparator = comparator;
         this.startColumn = startColumn;
         curColumnIndex = isAscending ? 0 : -1;
@@ -127,4 +129,162 @@
     {
         reader.close();
     }
+
+    /**
+     *  This is a reader that finds the block for a starting column and returns
+     *  blocks before/after it for each next call. This function assumes that
+     *  the CF is sorted by name and exploits the name index.
+     */
+    public static class ColumnGroupReader
+    {
+        private String key_;
+        private String cfName_;
+        private String cfType_;
+        private AbstractType comparator_;
+        private String subComparatorName_;
+        private boolean isAscending_;
+
+        private List<IndexHelper.ColumnIndexInfo> columnIndexList_;
+        private long columnStartPosition_;
+        private int curRangeIndex_;
+        private int allColumnsSize_;
+        private int localDeletionTime_;
+        private long markedForDeleteAt_;
+        private BufferedRandomAccessFile file_;
+
+        public ColumnGroupReader(String filename, String key, String cfName, AbstractType comparator, byte[] startColumn, boolean isAscending, long position) throws IOException
+        {
+            this.file_ = new BufferedRandomAccessFile(filename, "r");
+            this.cfName_ = cfName;
+            this.comparator_ = comparator;
+            this.subComparatorName_ = DatabaseDescriptor.getSubComparator(SSTableReader.parseTableName(filename), cfName).getClass().getCanonicalName();
+            this.key_ = key;
+            this.isAscending_ = isAscending;
+            init(startColumn, position);
+        }
+
+        /**
+         *   Build a list of index entries ready for search.
+         */
+        private List<IndexHelper.ColumnIndexInfo> getFullColumnIndexList(List<IndexHelper.ColumnIndexInfo> columnIndexList, int totalNumCols)
+        {
+            if (columnIndexList.size() == 0)
+            {
+                /* if there is no column index, add an index entry that covers the full space. */
+                return Arrays.asList(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, totalNumCols, comparator_));
+            }
+
+            List<IndexHelper.ColumnIndexInfo> fullColIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+            int accumulatededCols = 0;
+            for (IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList)
+                accumulatededCols += colPosInfo.count();
+            int remainingCols = totalNumCols - accumulatededCols;
+
+            fullColIndexList.add(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, columnIndexList.get(0).count(), comparator_));
+            for (int i = 0; i < columnIndexList.size() - 1; i++)
+            {
+                IndexHelper.ColumnIndexInfo colPosInfo = columnIndexList.get(i);
+                fullColIndexList.add(new IndexHelper.ColumnIndexInfo(colPosInfo.name(),
+                                                                     colPosInfo.position(),
+                                                                     columnIndexList.get(i + 1).count(),
+                                                                     comparator_));
+            }
+            byte[] columnName = columnIndexList.get(columnIndexList.size() - 1).name();
+            fullColIndexList.add(new IndexHelper.ColumnIndexInfo(columnName,
+                                                                 columnIndexList.get(columnIndexList.size() - 1).position(),
+                                                                 remainingCols,
+                                                                 comparator_));
+            return fullColIndexList;
+        }
+
+        private void init(byte[] startColumn, long position) throws IOException
+        {
+            file_.seek(position);
+            String keyInDisk = file_.readUTF();
+            assert keyInDisk.equals(key_);
+
+            /* read off the size of this row */
+            int dataSize = file_.readInt();
+            /* skip the bloomfilter */
+            int totalBytesRead = IndexHelper.skipBloomFilter(file_);
+            /* read off the index flag, it has to be true */
+            boolean hasColumnIndexes = file_.readBoolean();
+            totalBytesRead += 1;
+
+            /* read the index */
+            List<IndexHelper.ColumnIndexInfo> colIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+            if (hasColumnIndexes)
+                totalBytesRead += IndexHelper.deserializeIndex(SSTableReader.parseTableName(file_.getPath()), cfName_, file_, colIndexList);
+
+            /* need to do two things here.
+             * 1. move the file pointer to the beginning of the list of stored columns
+             * 2. calculate the size of all columns */
+            String cfName = file_.readUTF();
+            cfType_ = file_.readUTF();
+            String comparatorName = file_.readUTF();
+            assert comparatorName.equals(comparator_.getClass().getCanonicalName());
+            String subComparatorName = file_.readUTF(); // subcomparator
+            localDeletionTime_ = file_.readInt();
+            markedForDeleteAt_ = file_.readLong();
+            int totalNumCols = file_.readInt();
+            allColumnsSize_ = dataSize - (totalBytesRead + 4 * 2 + cfName.length() + cfType_.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4);
+
+            columnStartPosition_ = file_.getFilePointer();
+            columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols);
+
+            if (startColumn.length == 0 && !isAscending_)
+            {
+                /* in this case, we assume that we want to scan from the largest column in descending order. */
+                curRangeIndex_ = columnIndexList_.size() - 1;
+            }
+            else
+            {
+                int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, 0, comparator_));
+                curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index;
+            }
+        }
+
+        private boolean getBlockFromCurIndex(DataOutputBuffer bufOut) throws IOException
+        {
+            if (curRangeIndex_ < 0 || curRangeIndex_ >= columnIndexList_.size())
+                return false;
+            IndexHelper.ColumnIndexInfo curColPostion = columnIndexList_.get(curRangeIndex_);
+            long start = curColPostion.position();
+            long end = curRangeIndex_ < columnIndexList_.size() - 1
+                       ? columnIndexList_.get(curRangeIndex_+1).position()
+                       : allColumnsSize_;
+
+            /* seek to the correct offset to the data, and calculate the data size */
+            file_.seek(columnStartPosition_ + start);
+            long dataSize = end - start;
+
+            bufOut.reset();
+            // write CF info
+            bufOut.writeUTF(cfName_);
+            bufOut.writeUTF(cfType_);
+            bufOut.writeUTF(comparator_.getClass().getCanonicalName());
+            bufOut.writeUTF(subComparatorName_);
+            bufOut.writeInt(localDeletionTime_);
+            bufOut.writeLong(markedForDeleteAt_);
+            // now write the columns
+            bufOut.writeInt(curColPostion.count());
+            bufOut.write(file_, (int)dataSize);
+            return true;
+        }
+
+        public boolean getNextBlock(DataOutputBuffer outBuf) throws IOException
+        {
+            boolean result = getBlockFromCurIndex(outBuf);
+            if (isAscending_)
+                curRangeIndex_++;
+            else
+                curRangeIndex_--;
+            return result;
+        }
+
+        public void close() throws IOException
+        {
+            file_.close();
+        }
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java Sat Aug  1 22:31:15 2009
@@ -19,11 +19,8 @@
 package org.apache.cassandra.io;
 
 import java.io.IOException;
-import java.io.DataInput;
 import java.util.Iterator;
 
-import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ColumnFamily;
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Sat Aug  1 22:31:15 2009
@@ -133,7 +133,7 @@
      * @param columnIndexList the structure which is filled in with the deserialized index   @return number of bytes read from the input
      * @throws IOException
      */
-	static int deserializeIndex(String tableName, String cfName, DataInput in, List<ColumnIndexInfo> columnIndexList) throws IOException
+	public static int deserializeIndex(String tableName, String cfName, DataInput in, List<ColumnIndexInfo> columnIndexList) throws IOException
 	{
 		/* read only the column index list */
 		int columnIndexSize = in.readInt();
@@ -343,7 +343,7 @@
             position_ = position;
         }
         
-        int count()
+        public int count()
         {
             return columnCount_;
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Sat Aug  1 22:31:15 2009
@@ -67,7 +67,7 @@
         return dataFile;
     }
 
-    static String parseTableName(String filename)
+    public static String parseTableName(String filename)
     {
         return new File(filename).getParentFile().getName();        
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Sat Aug  1 22:31:15 2009
@@ -23,9 +23,7 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.SequenceFile.ColumnGroupReader;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.service.StorageService;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=799949&r1=799948&r2=799949&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java Sat Aug  1 22:31:15 2009
@@ -26,7 +26,7 @@
     public SSTableWriter(String filename, int keyCount, IPartitioner partitioner) throws IOException
     {
         super(filename, partitioner);
-        dataWriter = SequenceFile.bufferedWriter(dataFile, 4 * 1024 * 1024);
+        dataWriter = new AbstractWriter.BufferWriter(dataFile, 4 * 1024 * 1024);
         indexRAF = new BufferedRandomAccessFile(indexFilename(), "rw", 1024 * 1024);
         bf = new BloomFilter(keyCount, 15);
     }