You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/02 00:30:41 UTC

svn commit: r799944 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/filter/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/db/ test/unit/org/apac...

Author: jbellis
Date: Sat Aug  1 22:30:40 2009
New Revision: 799944

URL: http://svn.apache.org/viewvc?rev=799944&view=rev
Log:
r/m SF.next(DataOutput) in favor of more-efficient IteratingRow.
patch by jbellis; reviewed by Stu Hood for CASSANDRA-330

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer2.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Sat Aug  1 22:30:40 2009
@@ -18,9 +18,7 @@
 
 package org.apache.cassandra.db;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
 import java.lang.reflect.Proxy;
 import java.util.Collection;
 import java.util.HashMap;
@@ -37,6 +35,8 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.BufferedRandomAccessFile;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
@@ -48,7 +48,7 @@
 public final class ColumnFamily
 {
     /* The column serializer for this Column Family. Create based on config. */
-    private static ICompactSerializer<ColumnFamily> serializer_;
+    private static ColumnFamilySerializer serializer_ = new ColumnFamilySerializer();
     public static final short utfPrefix_ = 2;   
 
     private static Logger logger_ = Logger.getLogger( ColumnFamily.class );
@@ -58,13 +58,12 @@
 
     static
     {
-        serializer_ = new ColumnFamilySerializer();
         /* TODO: These are the various column types. Hard coded for now. */
         columnTypes_.put("Standard", "Standard");
         columnTypes_.put("Super", "Super");
     }
 
-    public static ICompactSerializer<ColumnFamily> serializer()
+    public static ColumnFamilySerializer serializer()
     {
         return serializer_;
     }
@@ -73,9 +72,9 @@
      * This method returns the serializer whose methods are
      * preprocessed by a dynamic proxy.
     */
-    public static ICompactSerializer<ColumnFamily> serializerWithIndexes()
+    public static ICompactSerializer2<ColumnFamily> serializerWithIndexes()
     {
-        return (ICompactSerializer<ColumnFamily>)Proxy.newProxyInstance( ColumnFamily.class.getClassLoader(), new Class[]{ICompactSerializer.class}, new CompactSerializerInvocationHandler<ColumnFamily>(serializer_) );
+        return (ICompactSerializer2<ColumnFamily>)Proxy.newProxyInstance( ColumnFamily.class.getClassLoader(), new Class[]{ICompactSerializer2.class}, new CompactSerializerInvocationHandler<ColumnFamily>(serializer_) );
     }
 
     public static String getColumnType(String key)
@@ -95,7 +94,7 @@
 
     private String name_;
 
-    private transient ICompactSerializer<IColumn> columnSerializer_;
+    private transient ICompactSerializer2<IColumn> columnSerializer_;
     private long markedForDeleteAt = Long.MIN_VALUE;
     private int localDeletionTime = Integer.MIN_VALUE;
     private AtomicInteger size_ = new AtomicInteger(0);
@@ -146,7 +145,7 @@
         }
     }
 
-    public ICompactSerializer<IColumn> getColumnSerializer()
+    public ICompactSerializer2<IColumn> getColumnSerializer()
     {
     	return columnSerializer_;
     }
@@ -433,7 +432,7 @@
         return cf;
     }
 
-    public static class ColumnFamilySerializer implements ICompactSerializer<ColumnFamily>
+    public static class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
     {
         /*
          * We are going to create indexes, and write out that information as well. The format
@@ -459,7 +458,7 @@
          * 	<total number of columns>
          * 	<columns data>
         */
-        public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+        public void serialize(ColumnFamily columnFamily, DataOutput dos) throws IOException
         {
             // TODO whenever we change this we need to change the code in SequenceFile to match in two places.
             // This SUCKS and is inefficient to boot.  let's fix this ASAP. 
@@ -480,13 +479,9 @@
             }
         }
 
-        public ColumnFamily deserialize(DataInputStream dis) throws IOException
+        public ColumnFamily deserialize(DataInput dis) throws IOException
         {
-            ColumnFamily cf = new ColumnFamily(dis.readUTF(),
-                                               dis.readUTF(),
-                                               readComparator(dis),
-                                               readComparator(dis));
-            cf.delete(dis.readInt(), dis.readLong());
+            ColumnFamily cf = deserializeEmpty(dis);
             int size = dis.readInt();
             IColumn column;
             for (int i = 0; i < size; ++i)
@@ -497,7 +492,7 @@
             return cf;
         }
 
-        private AbstractType readComparator(DataInputStream dis) throws IOException
+        private AbstractType readComparator(DataInput dis) throws IOException
         {
             String className = dis.readUTF();
             if (className.equals(""))
@@ -518,6 +513,16 @@
                 throw new RuntimeException(e);
             }
         }
+
+        public ColumnFamily deserializeEmpty(DataInput input) throws IOException
+        {
+            ColumnFamily cf = new ColumnFamily(input.readUTF(),
+                                               input.readUTF(),
+                                               readComparator(input),
+                                               readComparator(input));
+            cf.delete(input.readInt(), input.readLong());
+            return cf;
+        }
     }
 }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Aug  1 22:30:40 2009
@@ -650,7 +650,7 @@
         }
     }
 
-    private PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
+    private PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize) throws IOException
     {
         PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
         if (files.size() > 1 || (ranges != null && files.size() > 0))
@@ -659,31 +659,13 @@
             FileStruct fs = null;
             for (String file : files)
             {
-                try
-                {
-                    fs = SSTableReader.get(file).getFileStruct();
-                    fs.advance();
-                    if (fs.isExhausted())
-                    {
-                        continue;
-                    }
-                    pq.add(fs);
-                }
-                catch (Exception ex)
+                fs = SSTableReader.get(file).getFileStruct();
+                fs.advance(true);
+                if (fs.isExhausted())
                 {
-                    logger_.warn("corrupt file?  or are you just blowing away data files manually out from under me?", ex);
-                    try
-                    {
-                        if (fs != null)
-                        {
-                            fs.close();
-                        }
-                    }
-                    catch (Exception e)
-                    {
-                        logger_.error("Unable to close file :" + file);
-                    }
+                    continue;
                 }
+                pq.add(fs);
             }
         }
         return pq;
@@ -930,8 +912,8 @@
         // If the compaction file path is null that means we have no space left for this compaction.
         if (rangeFileLocation == null)
         {
-            logger_.warn("Total bytes to be written for range compaction  ..."
-                         + expectedRangeFileSize + "   is greater than the safe limit of the disk space available.");
+            logger_.error("Total bytes to be written for range compaction  ..."
+                          + expectedRangeFileSize + "   is greater than the safe limit of the disk space available.");
             return result;
         }
         PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
@@ -975,41 +957,28 @@
                 {
                     for (FileStruct filestruct : lfs)
                     {
-                        try
-                        {
-                            /* read the length although we don't need it */
-                            filestruct.getBufIn().readInt();
-                            // Skip the Index
-                            IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
-                            // We want to add only 2 and resolve them right there in order to save on memory footprint
-                            if (columnFamilies.size() > 1)
-                            {
-                                // Now merge the 2 column families
-                                merge(columnFamilies);
-                            }
-                            // deserialize into column families
-                            columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
-                        }
-                        catch (Exception ex)
+                        // We want to add only 2 and resolve them right there in order to save on memory footprint
+                        if (columnFamilies.size() > 1)
                         {
-                            logger_.warn(LogUtil.throwableToString(ex));
+                            // Now merge the 2 column families
+                            merge(columnFamilies);
                         }
+                        // deserialize into column families
+                        columnFamilies.add(filestruct.getColumnFamily());
                     }
                     // Now after merging all crap append to the sstable
                     columnFamily = resolveAndRemoveDeleted(columnFamilies);
                     columnFamilies.clear();
                     if (columnFamily != null)
                     {
-                        /* serialize the cf with column indexes */
                         ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
                     }
                 }
                 else
                 {
+                    // TODO deserializing only to reserialize is dumb
                     FileStruct filestruct = lfs.get(0);
-                    /* read the length although we don't need it */
-                    int size = filestruct.getBufIn().readInt();
-                    bufOut.write(filestruct.getBufIn(), size);
+                    ColumnFamily.serializerWithIndexes().serialize(filestruct.getColumnFamily(), bufOut);
                 }
                 if (Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(lastkey), ranges))
                 {
@@ -1023,48 +992,30 @@
                         String fname = new File(rangeFileLocation, mergedFileName).getAbsolutePath();
                         rangeWriter = new SSTableWriter(fname, expectedBloomFilterSize, StorageService.getPartitioner());
                     }
-                    try
-                    {
-                        rangeWriter.append(lastkey, bufOut);
-                    }
-                    catch (Exception ex)
-                    {
-                        logger_.warn(LogUtil.throwableToString(ex));
-                    }
+                    rangeWriter.append(lastkey, bufOut);
                 }
                 totalkeysWritten++;
                 for (FileStruct filestruct : lfs)
                 {
-                    try
+                    filestruct.advance(true);
+                    if (filestruct.isExhausted())
                     {
-                        filestruct.advance();
+                        continue;
+                    }
+                    /* keep on looping until we find a key in the range */
+                    while (!Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(filestruct.getKey()), ranges))
+                    {
+                        filestruct.advance(true);
                         if (filestruct.isExhausted())
                         {
-                            continue;
-                        }
-                        /* keep on looping until we find a key in the range */
-                        while (!Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(filestruct.getKey()), ranges))
-                        {
-                            filestruct.advance();
-                            if (filestruct.isExhausted())
-                            {
-                                break;
-                            }
-                        }
-                        if (!filestruct.isExhausted())
-                        {
-                            pq.add(filestruct);
+                            break;
                         }
-                        totalkeysRead++;
                     }
-                    catch (Exception ex)
+                    if (!filestruct.isExhausted())
                     {
-                        // Ignore the exception as it might be a corrupted file
-                        // in any case we have read as far as possible from it
-                        // and it will be deleted after compaction.
-                        logger_.warn("corrupt sstable?", ex);
-                        filestruct.close();
+                        pq.add(filestruct);
                     }
+                    totalkeysRead++;
                 }
                 lfs.clear();
                 lastkey = null;
@@ -1096,11 +1047,6 @@
         return result;
     }
 
-    private void doFill(BloomFilter bf, String decoratedKey)
-    {
-        bf.add(StorageService.getPartitioner().undecorateKey(decoratedKey));
-    }
-
     /*
     * This function does the actual compaction for files.
     * It maintains a priority queue of with the first key from each file
@@ -1176,40 +1122,27 @@
                 {
                     for (FileStruct filestruct : lfs)
                     {
-                        try
+                        // We want to add only 2 and resolve them right there in order to save on memory footprint
+                        if (columnFamilies.size() > 1)
                         {
-                            /* read the length although we don't need it */
-                            filestruct.getBufIn().readInt();
-                            // Skip the Index
-                            IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
-                            // We want to add only 2 and resolve them right there in order to save on memory footprint
-                            if (columnFamilies.size() > 1)
-                            {
-                                merge(columnFamilies);
-                            }
-                            // deserialize into column families
-                            columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
-                        }
-                        catch (Exception ex)
-                        {
-                            logger_.warn("error in filecompaction", ex);
+                            merge(columnFamilies);
                         }
+                        // deserialize into column families
+                        columnFamilies.add(filestruct.getColumnFamily());
                     }
                     // Now after merging all crap append to the sstable
                     columnFamily = resolveAndRemoveDeleted(columnFamilies);
                     columnFamilies.clear();
                     if (columnFamily != null)
                     {
-                        /* serialize the cf with column indexes */
                         ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
                     }
                 }
                 else
                 {
+                    // TODO deserializing only to reserialize is dumb
                     FileStruct filestruct = lfs.get(0);
-                    /* read the length although we don't need it */
-                    int size = filestruct.getBufIn().readInt();
-                    bufOut.write(filestruct.getBufIn(), size);
+                    ColumnFamily.serializerWithIndexes().serialize(filestruct.getColumnFamily(), bufOut);
                 }
 
                 if (writer == null)
@@ -1222,24 +1155,13 @@
 
                 for (FileStruct filestruct : lfs)
                 {
-                    try
+                    filestruct.advance(true);
+                    if (filestruct.isExhausted())
                     {
-                        filestruct.advance();
-                        if (filestruct.isExhausted())
-                        {
-                            continue;
-                        }
-                        pq.add(filestruct);
-                        totalkeysRead++;
-                    }
-                    catch (Throwable ex)
-                    {
-                        // Ignore the exception as it might be a corrupted file
-                        // in any case we have read as far as possible from it
-                        // and it will be deleted after compaction.
-                        logger_.warn("corrupt sstable?", ex);
-                        filestruct.close();
+                        continue;
                     }
+                    pq.add(filestruct);
+                    totalkeysRead++;
                 }
                 lfs.clear();
                 lastkey = null;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Sat Aug  1 22:30:40 2009
@@ -3,9 +3,10 @@
 import java.io.*;
 
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class ColumnSerializer implements ICompactSerializer<IColumn>
+public class ColumnSerializer implements ICompactSerializer2<IColumn>
 {
     public static void writeName(byte[] name, DataOutput out) throws IOException
     {
@@ -26,7 +27,7 @@
         return bytes;
     }
 
-    public void serialize(IColumn column, DataOutputStream dos) throws IOException
+    public void serialize(IColumn column, DataOutput dos) throws IOException
     {
         ColumnSerializer.writeName(column.name(), dos);
         dos.writeBoolean(column.isMarkedForDelete());
@@ -34,7 +35,7 @@
         FBUtilities.writeByteArray(column.value(), dos);
     }
 
-    public Column deserialize(DataInputStream dis) throws IOException
+    public Column deserialize(DataInput dis) throws IOException
     {
         byte[] name = ColumnSerializer.readName(dis);
         boolean delete = dis.readBoolean();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Sat Aug  1 22:30:40 2009
@@ -272,7 +272,7 @@
             Set<Table> tablesRecovered = new HashSet<Table>();
 
             /* read the logs populate RowMutation and apply */
-            while (reader.getFilePointer() < reader.length())
+            while (!reader.isEOF())
             {
                 byte[] bytes;
                 try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java Sat Aug  1 22:30:40 2009
@@ -23,6 +23,7 @@
 
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.ICompactSerializer2;
 
 
 /*
@@ -33,9 +34,9 @@
 
 public class CompactSerializerInvocationHandler<T> implements InvocationHandler
 {
-    private ICompactSerializer<T> serializer_;
+    private ICompactSerializer2<T> serializer_;
 
-    public CompactSerializerInvocationHandler(ICompactSerializer<T> serializer)
+    public CompactSerializerInvocationHandler(ICompactSerializer2<T> serializer)
     {
         serializer_ = serializer;
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Sat Aug  1 22:30:40 2009
@@ -18,10 +18,7 @@
 
 package org.apache.cassandra.db;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
+import java.io.*;
 import java.util.Collection;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -32,6 +29,7 @@
 
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 
@@ -326,7 +324,7 @@
     }
 }
 
-class SuperColumnSerializer implements ICompactSerializer<IColumn>
+class SuperColumnSerializer implements ICompactSerializer2<IColumn>
 {
     private AbstractType comparator;
 
@@ -340,7 +338,7 @@
         return comparator;
     }
 
-    public void serialize(IColumn column, DataOutputStream dos) throws IOException
+    public void serialize(IColumn column, DataOutput dos) throws IOException
     {
     	SuperColumn superColumn = (SuperColumn)column;
         ColumnSerializer.writeName(column.name(), dos);
@@ -358,12 +356,11 @@
         }
     }
 
-    public IColumn deserialize(DataInputStream dis) throws IOException
+    public IColumn deserialize(DataInput dis) throws IOException
     {
         byte[] name = ColumnSerializer.readName(dis);
         SuperColumn superColumn = new SuperColumn(name, comparator);
         superColumn.markForDeleteAt(dis.readInt(), dis.readLong());
-        assert dis.available() > 0;
 
         /* read the number of columns */
         int size = dis.readInt();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java Sat Aug  1 22:30:40 2009
@@ -28,7 +28,7 @@
         {
             dataReader = SequenceFile.bufferedReader(ssTable.getFilename(), 64 * 1024);
             String decoratedKey = ssTable.getPartitioner().decorateKey(key);
-            long position = ssTable.getPosition(decoratedKey, ssTable.getPartitioner());
+            long position = ssTable.getPosition(decoratedKey);
 
             long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columns, position);
             if (bytesRead != -1L)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Sat Aug  1 22:30:40 2009
@@ -36,7 +36,7 @@
 
         /* Morph key into actual key based on the partition type. */
         String decoratedKey = ssTable.getPartitioner().decorateKey(key);
-        long position = ssTable.getPosition(decoratedKey, ssTable.getPartitioner());
+        long position = ssTable.getPosition(decoratedKey);
         AbstractType comparator1 = DatabaseDescriptor.getComparator(ssTable.getTableName(), cfName);
         reader = new SequenceFile.ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position);
         this.comparator = comparator;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java Sat Aug  1 22:30:40 2009
@@ -165,6 +165,12 @@
         getChannel().force(true);
     }
 
+    public boolean isEOF() throws IOException
+    {
+        assert getFilePointer() <= length();
+        return getFilePointer() == length();
+    }
+
     public void close() throws IOException
     {
         this.flush();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java Sat Aug  1 22:30:40 2009
@@ -19,12 +19,13 @@
 package org.apache.cassandra.io;
 
 import java.io.IOException;
+import java.io.DataInput;
 import java.util.Iterator;
 
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.IFileReader;
 import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ColumnFamily;
 
 import org.apache.log4j.Logger;
 import com.google.common.collect.AbstractIterator;
@@ -34,30 +35,26 @@
 {
     private static Logger logger = Logger.getLogger(FileStruct.class);
 
-    private String key = null; // decorated!
+    private IteratingRow row;
     private boolean exhausted = false;
-    private IFileReader reader;
-    private DataInputBuffer bufIn;
-    private DataOutputBuffer bufOut;
+    private BufferedRandomAccessFile file;
     private SSTableReader sstable;
     private FileStructIterator iterator;
 
     FileStruct(SSTableReader sstable) throws IOException
     {
-        this.reader = SequenceFile.bufferedReader(sstable.getFilename(), 1024 * 1024);
+        this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", 1024 * 1024);
         this.sstable = sstable;
-        bufIn = new DataInputBuffer();
-        bufOut = new DataOutputBuffer();
     }
 
     public String getFileName()
     {
-        return reader.getFileName();
+        return file.getPath();
     }
 
     public void close() throws IOException
     {
-        reader.close();
+        file.close();
     }
 
     public boolean isExhausted()
@@ -65,20 +62,20 @@
         return exhausted;
     }
 
-    public DataInputBuffer getBufIn()
+    public String getKey()
     {
-        return bufIn;
+        return row.getKey();
     }
 
-    public String getKey()
+    public ColumnFamily getColumnFamily()
     {
-        return key;
+        return row.getEmptyColumnFamily();
     }
 
     public int compareTo(FileStruct f)
     {
-        return sstable.getPartitioner().getDecoratedKeyComparator().compare(key, f.key);
-    }    
+        return sstable.getPartitioner().getDecoratedKeyComparator().compare(getKey(), f.getKey());
+    }
 
     public void seekTo(String seekKey)
     {
@@ -90,8 +87,8 @@
                 exhausted = true;
                 return;
             }
-            reader.seek(position);
-            advance();
+            file.seek(position);
+            advance(false);
         }
         catch (IOException e)
         {
@@ -100,37 +97,40 @@
     }
 
     /*
-     * Read the next key from the data file, skipping block indexes.
+     * Read the next key from the data file.
      * Caller must check isExhausted after each call to see if further
      * reads are valid.
      * Do not mix with calls to the iterator interface (next/hasnext).
      * @deprecated -- prefer the iterator interface.
      */
-    public void advance() throws IOException
+    public void advance(boolean materialize) throws IOException
     {
+        // TODO r/m materialize option -- use iterableness!
         if (exhausted)
         {
             throw new IndexOutOfBoundsException();
         }
 
-        bufOut.reset();
-        if (reader.isEOF())
+        if (file.isEOF())
         {
-            reader.close();
+            file.close();
             exhausted = true;
             return;
         }
 
-        long bytesread = reader.next(bufOut);
-        if (bytesread == -1)
+        row = new IteratingRow(file);
+        if (materialize)
         {
-            reader.close();
-            exhausted = true;
-            return;
+            while (row.hasNext())
+            {
+                IColumn column = row.next();
+                row.getEmptyColumnFamily().addColumn(column);
+            }
+        }
+        else
+        {
+            row.skipRemaining();
         }
-
-        bufIn.reset(bufOut.getData(), bufOut.getLength());
-        key = bufIn.readUTF();
     }
 
     public boolean hasNext()
@@ -157,7 +157,7 @@
     {
         public FileStructIterator()
         {
-            if (key == null)
+            if (row == null)
             {
                 if (!isExhausted())
                 {
@@ -170,7 +170,7 @@
         {
             try
             {
-                advance();
+                advance(false);
             }
             catch (IOException e)
             {
@@ -184,7 +184,7 @@
             {
                 return endOfData();
             }
-            String oldKey = key;
+            String oldKey = getKey();
             forward();
             return oldKey;
         }

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer2.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer2.java?rev=799944&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer2.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer2.java Sat Aug  1 22:30:40 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+public interface ICompactSerializer2<T>
+{
+	/**
+     * Serialize the specified type into the specified DataOutput instance.
+     * @param t type that needs to be serialized
+     * @param dos DataOutput into which serialization needs to happen.
+     * @throws IOException
+     */
+    public void serialize(T t, DataOutput dos) throws IOException;
+
+    /**
+     * Deserialize from the specified DataInput instance.
+     * @param dis DataInput from which deserialization needs to happen.
+     * @throws IOException
+     * @return the type that was deserialized
+     */
+    public T deserialize(DataInput dis) throws IOException;    
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java Sat Aug  1 22:30:40 2009
@@ -37,16 +37,6 @@
 
     /**
      * This method dumps the next key/value into the DataOuputStream
-     * passed in.
-     *
-     * @param bufOut DataOutputStream that needs to be filled.
-     * @return number of bytes read.
-     * @throws IOException 
-    */
-    public long next(DataOutputBuffer bufOut) throws IOException;
-
-    /**
-     * This method dumps the next key/value into the DataOuputStream
      * passed in. Always use this method to query for application
      * specific data as it will have indexes.
      *

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java?rev=799944&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java Sat Aug  1 22:30:40 2009
@@ -0,0 +1,61 @@
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import com.google.common.collect.AbstractIterator;
+
+public class IteratingRow extends AbstractIterator<IColumn>
+{
+    private final String key;
+    private final long finishedAt;
+    private final ColumnFamily emptyColumnFamily;
+    private final BufferedRandomAccessFile file;
+
+    public IteratingRow(BufferedRandomAccessFile file) throws IOException
+    {
+        this.file = file;
+
+        key = file.readUTF();
+        long dataSize = file.readInt();
+        long dataStart = file.getFilePointer();
+        finishedAt = dataStart + dataSize;
+        IndexHelper.skipBloomFilterAndIndex(file);
+        emptyColumnFamily = ColumnFamily.serializer().deserializeEmpty(file);
+        file.readInt(); // column count. breaking serializer encapsulation is less fugly than adding a wrapper class to allow deserializeEmpty to return both values
+    }
+
+    public String getKey()
+    {
+        return key;
+    }
+
+    public ColumnFamily getEmptyColumnFamily()
+    {
+        return emptyColumnFamily;
+    }
+
+    public void skipRemaining() throws IOException
+    {
+        file.seek(finishedAt);
+    }
+
+    protected IColumn computeNext()
+    {
+        try
+        {
+            assert file.getFilePointer() <= finishedAt;
+            if (file.getFilePointer() == finishedAt)
+            {
+                return endOfData();
+            }
+
+            return emptyColumnFamily.getColumnSerializer().deserialize(file);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Sat Aug  1 22:30:40 2009
@@ -198,7 +198,7 @@
     /**
      * returns the position in the data file to find the given key, or -1 if the key is not present
      */
-    public long getPosition(String decoratedKey, IPartitioner partitioner) throws IOException
+    public long getPosition(String decoratedKey) throws IOException
     {
         if (!bf.isPresent(decoratedKey))
             return -1;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Sat Aug  1 22:30:40 2009
@@ -417,37 +417,6 @@
                 }
             }
         }
-
-        /**
-         * This method dumps the next key/value into the DataOuputStream
-         * passed in.
-         *
-         * @param bufOut DataOutputStream that needs to be filled.
-         * @return total number of bytes read/considered
-         */
-        public long next(DataOutputBuffer bufOut) throws IOException
-        {
-            long bytesRead = -1L;
-            if (isEOF())
-                return bytesRead;
-
-            long startPosition = file_.getFilePointer();
-            String key = file_.readUTF();
-            if (key != null)
-            {
-                /* write the key into buffer */
-                bufOut.writeUTF(key);
-                int dataSize = file_.readInt();
-                /* write data size into buffer */
-                bufOut.writeInt(dataSize);
-                /* write the data into buffer */
-                bufOut.write(file_, dataSize);
-                long endPosition = file_.getFilePointer();
-                bytesRead = endPosition - startPosition;
-            }
-
-            return bytesRead;
-        }
     }
 
     public static class Reader extends AbstractReader

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Sat Aug  1 22:30:40 2009
@@ -385,7 +385,10 @@
     {
         int length = in.readInt();
         byte[] bytes = new byte[length];
-        in.readFully(bytes);
+        if (length > 0)
+        {
+            in.readFully(bytes);
+        }
         return bytes;
     }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java Sat Aug  1 22:30:40 2009
@@ -39,13 +39,13 @@
 
         Set<String> inserted = new HashSet<String>();
         for (int j = 0; j < insertsPerTable; j++) {
-            String key = "0";
+            String key = String.valueOf(j);
             RowMutation rm = new RowMutation("Keyspace1", key);
             rm.add(new QueryPath(columnFamilyName, null, "0".getBytes()), new byte[0], j);
             rm.apply();
             inserted.add(key);
             store.forceBlockingFlush();
-            assertEquals(table.getKeyRange(columnFamilyName, "", "", 10000).keys.size(), inserted.size());
+            assertEquals(inserted.size(), table.getKeyRange(columnFamilyName, "", "", 10000).keys.size());
         }
         Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
         ft.get();
@@ -62,6 +62,6 @@
     @Test
     public void testCompaction2() throws IOException, ExecutionException, InterruptedException
     {
-        testCompaction("Standard2", 5);
+        testCompaction("Standard2", 2);
     }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java Sat Aug  1 22:30:40 2009
@@ -57,11 +57,12 @@
 
     private void verifySingle(SSTableReader sstable, byte[] bytes, String key) throws IOException
     {
-        FileStruct fs = sstable.getFileStruct();
-        fs.seekTo(key);
-        int size = fs.getBufIn().readInt();
+        BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.dataFile, "r");
+        file.seek(sstable.getPosition(key));
+        assert key.equals(file.readUTF());
+        int size = file.readInt();
         byte[] bytes2 = new byte[size];
-        fs.getBufIn().readFully(bytes2);
+        file.readFully(bytes2);
         assert Arrays.equals(bytes2, bytes);
     }
 
@@ -93,13 +94,14 @@
     {
         List<String> keys = new ArrayList<String>(map.keySet());
         Collections.shuffle(keys);
-        FileStruct fs = sstable.getFileStruct();
+        BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.dataFile, "r");
         for (String key : keys)
         {
-            fs.seekTo(key);
-            int size = fs.getBufIn().readInt();
+            file.seek(sstable.getPosition(key));
+            assert key.equals(file.readUTF());
+            int size = file.readInt();
             byte[] bytes2 = new byte[size];
-            fs.getBufIn().readFully(bytes2);
+            file.readFully(bytes2);
             assert Arrays.equals(bytes2, map.get(key));
         }
     }