You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/13 01:33:37 UTC

svn commit: r1145818 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/utils/

Author: jbellis
Date: Tue Jul 12 23:33:37 2011
New Revision: 1145818

URL: http://svn.apache.org/viewvc?rev=1145818&view=rev
Log:
optimize away seek when compacting wide rows
patch by Pavel Yaskevich and jbellis for CASSANDRA-2879

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jul 12 23:33:37 2011
@@ -11,6 +11,7 @@
  * restrict repair streaming to specific columnfamilies (CASSANDRA-2280)
  * don't bother persisting columns shadowed by a row tombstone (CASSANDRA-2589)
  * reset CF and SC deletion times after gc_grace (CASSANDRA-2317)
+ * optimize away seek when compacting wide rows (CASSANDRA-2879)
 
 
 0.8.2

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Tue Jul 12 23:33:37 2011
@@ -306,10 +306,15 @@ public class ColumnFamily extends Abstra
 
     public long serializedSize()
     {
-        int size = boolSize_ // bool
-                 + intSize_ // id
-                 + intSize_ // local deletion time
-                 + longSize_ // client deltion time
+        return boolSize_ // nullness bool
+               + intSize_ // id
+               + serializedSizeForSSTable();
+    }
+
+    public long serializedSizeForSSTable()
+    {
+        int size = intSize_ // local deletion time
+                 + longSize_ // client deletion time
                  + intSize_; // column count
         for (IColumn column : columns.values())
             size += column.serializedSize();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Tue Jul 12 23:33:37 2011
@@ -102,9 +102,9 @@ public class ColumnFamilySerializer impl
         dos.writeLong(columnFamily.getMarkedForDeleteAt());
     }
 
-    public int serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos)
+    public int serializeWithIndexes(ColumnFamily columnFamily, ColumnIndexer.RowHeader index, DataOutput dos)
     {
-        ColumnIndexer.serialize(columnFamily, dos);
+        ColumnIndexer.serialize(index, dos);
         return serializeForSSTable(columnFamily, dos);
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Tue Jul 12 23:33:37 2011
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOError;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -39,15 +40,15 @@ public class ColumnIndexer
 	/**
 	 * Given a column family this, function creates an in-memory structure that represents the
 	 * column index for the column family, and subsequently writes it to disk.
+     *
 	 * @param columns Column family to create index for
 	 * @param dos data output stream
-	 * @throws IOException
 	 */
     public static void serialize(IIterableColumns columns, DataOutput dos)
     {
         try
         {
-            serializeInternal(columns, dos);
+            writeIndex(serialize(columns), dos);
         }
         catch (IOException e)
         {
@@ -55,24 +56,41 @@ public class ColumnIndexer
         }
     }
 
-    public static void serializeInternal(IIterableColumns columns, DataOutput dos) throws IOException
+    public static void serialize(RowHeader indexInfo, DataOutput dos)
+    {
+        try
+        {
+            writeIndex(indexInfo, dos);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    /**
+     * Serializes the index into in-memory structure with all required components
+     * such as Bloom Filter, index block size, IndexInfo list
+     *
+     * @param columns Column family to create index for
+     *
+     * @return information about index - it's Bloom Filter, block size and IndexInfo list
+     */
+    public static RowHeader serialize(IIterableColumns columns)
     {
         int columnCount = columns.getEstimatedColumnCount();
 
         BloomFilter bf = BloomFilter.getFilter(columnCount, 4);
 
         if (columnCount == 0)
-        {
-            writeEmptyHeader(dos, bf);
-            return;
-        }
+            return new RowHeader(bf, Collections.<IndexHelper.IndexInfo>emptyList());
 
         // update bloom filter and create a list of IndexInfo objects marking the first and last column
         // in each block of ColumnIndexSize
         List<IndexHelper.IndexInfo> indexList = new ArrayList<IndexHelper.IndexInfo>();
         int endPosition = 0, startPosition = -1;
-        int indexSizeInBytes = 0;
         IColumn lastColumn = null, firstColumn = null;
+
         for (IColumn column : columns)
         {
             bf.add(column.name());
@@ -82,13 +100,14 @@ public class ColumnIndexer
                 firstColumn = column;
                 startPosition = endPosition;
             }
+
             endPosition += column.serializedSize();
-            /* if we hit the column index size that we have to index after, go ahead and index it. */
+
+            // if we hit the column index size that we have to index after, go ahead and index it.
             if (endPosition - startPosition >= DatabaseDescriptor.getColumnIndexSize())
             {
                 IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), column.name(), startPosition, endPosition - startPosition);
                 indexList.add(cIndexInfo);
-                indexSizeInBytes += cIndexInfo.serializedSize();
                 firstColumn = null;
             }
 
@@ -97,45 +116,43 @@ public class ColumnIndexer
 
         // all columns were GC'd after all
         if (lastColumn == null)
-        {
-            writeEmptyHeader(dos, bf);
-            return;
-        }
+            return new RowHeader(bf, Collections.<IndexHelper.IndexInfo>emptyList());
 
         // the last column may have fallen on an index boundary already.  if not, index it explicitly.
         if (indexList.isEmpty() || columns.getComparator().compare(indexList.get(indexList.size() - 1).lastName, lastColumn.name()) != 0)
         {
             IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), lastColumn.name(), startPosition, endPosition - startPosition);
             indexList.add(cIndexInfo);
-            indexSizeInBytes += cIndexInfo.serializedSize();
         }
 
+        // we should always have at least one computed index block, but we only write it out if there is more than that.
+        assert indexList.size() > 0;
+        return new RowHeader(bf, indexList);
+    }
+
+    private static void writeIndex(RowHeader indexInfo, DataOutput dos) throws IOException
+    {
+        assert indexInfo != null;
+
         /* Write out the bloom filter. */
-        writeBloomFilter(dos, bf);
+        writeBloomFilter(dos, indexInfo.bloomFilter);
 
-        // write the index.  we should always have at least one computed index block, but we only write it out if there is more than that.
-        assert indexSizeInBytes > 0;
-        if (indexList.size() > 1)
+        dos.writeInt(indexInfo.entriesSize);
+        if (indexInfo.indexEntries.size() > 1)
         {
-            dos.writeInt(indexSizeInBytes);
-            for (IndexHelper.IndexInfo cIndexInfo : indexList)
-            {
+            for (IndexHelper.IndexInfo cIndexInfo : indexInfo.indexEntries)
                 cIndexInfo.serialize(dos);
-            }
         }
-        else
-        {
-            dos.writeInt(0);
-        }
-	}
-
-    private static void writeEmptyHeader(DataOutput dos, BloomFilter bf)
-            throws IOException
-    {
-        writeBloomFilter(dos, bf);
-        dos.writeInt(0);
     }
 
+    /**
+     * Write a Bloom filter into file
+     *
+     * @param dos file to serialize Bloom Filter
+     * @param bf Bloom Filter
+     *
+     * @throws IOException on any I/O error.
+     */
     private static void writeBloomFilter(DataOutput dos, BloomFilter bf) throws IOException
     {
         DataOutputBuffer bufOut = new DataOutputBuffer();
@@ -145,4 +162,36 @@ public class ColumnIndexer
         bufOut.flush();
     }
 
+    /**
+     * Holds information about serialized index and bloom filter
+     */
+    public static class RowHeader
+    {
+        public final BloomFilter bloomFilter;
+        public final List<IndexHelper.IndexInfo> indexEntries;
+        public final int entriesSize;
+
+        public RowHeader(BloomFilter bf, List<IndexHelper.IndexInfo> indexes)
+        {
+            assert bf != null;
+            assert indexes != null;
+            bloomFilter = bf;
+            indexEntries = indexes;
+            int entriesSize = 0;
+            if (indexEntries.size() > 1)
+            {
+                for (IndexHelper.IndexInfo info : indexEntries)
+                    entriesSize += info.serializedSize();
+            }
+            this.entriesSize = entriesSize;
+        }
+
+        public long serializedSize()
+        {
+            return DBConstants.intSize_  // length of Bloom Filter
+                   + bloomFilter.serializedSize() // BF data
+                   + DBConstants.intSize_ // length of index block
+                   + entriesSize; // index block
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jul 12 23:33:37 2011
@@ -27,19 +27,15 @@ import java.util.HashSet;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.*;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.FileUtils;
@@ -148,23 +144,20 @@ public class SSTableWriter extends SSTab
     {
         long startPosition = beforeAppend(decoratedKey);
         ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile);
-        // write placeholder for the row size, since we don't know it yet
-        long sizePosition = dataFile.getFilePointer();
-        dataFile.writeLong(-1);
-        // write out row data
-        int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, dataFile);
-        // seek back and write the row size (not including the size Long itself)
-        long endPosition = dataFile.getFilePointer();
-        dataFile.seek(sizePosition);
-        long dataSize = endPosition - (sizePosition + 8);
-        assert dataSize > 0;
-        dataFile.writeLong(dataSize);
-        // finally, reset for next row
-        dataFile.seek(endPosition);
+
+        // serialize index and bloom filter into in-memory structure
+        ColumnIndexer.RowHeader header = ColumnIndexer.serialize(cf);
+
+        // write out row size
+        dataFile.writeLong(header.serializedSize() + cf.serializedSizeForSSTable());
+
+        // write out row header and data
+        int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, header, dataFile);
         afterAppend(decoratedKey, startPosition);
+
         // track max column timestamp
         sstableMetadataCollector.updateMaxTimestamp(cf.maxTimestamp());
-        sstableMetadataCollector.addRowSize(endPosition - startPosition);
+        sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - startPosition);
         sstableMetadataCollector.addColumnCount(columnCount);
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java Tue Jul 12 23:33:37 2011
@@ -138,4 +138,9 @@ public class BloomFilter extends Filter
     {
         bitset.clear(0, bitset.size());
     }
+
+    public int serializedSize()
+    {
+        return BloomFilterSerializer.serializedSize(this);
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java Tue Jul 12 23:33:37 2011
@@ -25,6 +25,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.cassandra.db.DBConstants;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.utils.obs.OpenBitSet;
 
@@ -52,6 +53,19 @@ class BloomFilterSerializer implements I
         OpenBitSet bs = new OpenBitSet(bits, bitLength);
         return new BloomFilter(hashes, bs);
     }
-}
-
 
+    /**
+     * Calculates a serialized size of the given Bloom Filter
+     * @see this.serialize(BloomFilter, DataOutput)
+     *
+     * @param bf Bloom filter to calculate serialized size
+     *
+     * @return serialized size of the given bloom filter
+     */
+    public static int serializedSize(BloomFilter bf)
+    {
+        return DBConstants.intSize_ // hash count
+               + DBConstants.intSize_ // length
+               + bf.bitset.getBits().length * DBConstants.longSize_; // buckets
+    }
+}