You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/13 01:33:37 UTC
svn commit: r1145818 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/utils/
Author: jbellis
Date: Tue Jul 12 23:33:37 2011
New Revision: 1145818
URL: http://svn.apache.org/viewvc?rev=1145818&view=rev
Log:
optimize away seek when compacting wide rows
patch by Pavel Yaskevich and jbellis for CASSANDRA-2879
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java
cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jul 12 23:33:37 2011
@@ -11,6 +11,7 @@
* restrict repair streaming to specific columnfamilies (CASSANDRA-2280)
* don't bother persisting columns shadowed by a row tombstone (CASSANDRA-2589)
* reset CF and SC deletion times after gc_grace (CASSANDRA-2317)
+ * optimize away seek when compacting wide rows (CASSANDRA-2879)
0.8.2
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Tue Jul 12 23:33:37 2011
@@ -306,10 +306,15 @@ public class ColumnFamily extends Abstra
public long serializedSize()
{
- int size = boolSize_ // bool
- + intSize_ // id
- + intSize_ // local deletion time
- + longSize_ // client deltion time
+ return boolSize_ // nullness bool
+ + intSize_ // id
+ + serializedSizeForSSTable();
+ }
+
+ public long serializedSizeForSSTable()
+ {
+ int size = intSize_ // local deletion time
+ + longSize_ // client deletion time
+ intSize_; // column count
for (IColumn column : columns.values())
size += column.serializedSize();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Tue Jul 12 23:33:37 2011
@@ -102,9 +102,9 @@ public class ColumnFamilySerializer impl
dos.writeLong(columnFamily.getMarkedForDeleteAt());
}
- public int serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos)
+ public int serializeWithIndexes(ColumnFamily columnFamily, ColumnIndexer.RowHeader index, DataOutput dos)
{
- ColumnIndexer.serialize(columnFamily, dos);
+ ColumnIndexer.serialize(index, dos);
return serializeForSSTable(columnFamily, dos);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Tue Jul 12 23:33:37 2011
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOError;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -39,15 +40,15 @@ public class ColumnIndexer
/**
* Given a column family this, function creates an in-memory structure that represents the
* column index for the column family, and subsequently writes it to disk.
+ *
* @param columns Column family to create index for
* @param dos data output stream
- * @throws IOException
*/
public static void serialize(IIterableColumns columns, DataOutput dos)
{
try
{
- serializeInternal(columns, dos);
+ writeIndex(serialize(columns), dos);
}
catch (IOException e)
{
@@ -55,24 +56,41 @@ public class ColumnIndexer
}
}
- public static void serializeInternal(IIterableColumns columns, DataOutput dos) throws IOException
+ public static void serialize(RowHeader indexInfo, DataOutput dos)
+ {
+ try
+ {
+ writeIndex(indexInfo, dos);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ /**
+ * Serializes the index into in-memory structure with all required components
+ * such as Bloom Filter, index block size, IndexInfo list
+ *
+ * @param columns Column family to create index for
+ *
+ * @return information about index - it's Bloom Filter, block size and IndexInfo list
+ */
+ public static RowHeader serialize(IIterableColumns columns)
{
int columnCount = columns.getEstimatedColumnCount();
BloomFilter bf = BloomFilter.getFilter(columnCount, 4);
if (columnCount == 0)
- {
- writeEmptyHeader(dos, bf);
- return;
- }
+ return new RowHeader(bf, Collections.<IndexHelper.IndexInfo>emptyList());
// update bloom filter and create a list of IndexInfo objects marking the first and last column
// in each block of ColumnIndexSize
List<IndexHelper.IndexInfo> indexList = new ArrayList<IndexHelper.IndexInfo>();
int endPosition = 0, startPosition = -1;
- int indexSizeInBytes = 0;
IColumn lastColumn = null, firstColumn = null;
+
for (IColumn column : columns)
{
bf.add(column.name());
@@ -82,13 +100,14 @@ public class ColumnIndexer
firstColumn = column;
startPosition = endPosition;
}
+
endPosition += column.serializedSize();
- /* if we hit the column index size that we have to index after, go ahead and index it. */
+
+ // if we hit the column index size that we have to index after, go ahead and index it.
if (endPosition - startPosition >= DatabaseDescriptor.getColumnIndexSize())
{
IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), column.name(), startPosition, endPosition - startPosition);
indexList.add(cIndexInfo);
- indexSizeInBytes += cIndexInfo.serializedSize();
firstColumn = null;
}
@@ -97,45 +116,43 @@ public class ColumnIndexer
// all columns were GC'd after all
if (lastColumn == null)
- {
- writeEmptyHeader(dos, bf);
- return;
- }
+ return new RowHeader(bf, Collections.<IndexHelper.IndexInfo>emptyList());
// the last column may have fallen on an index boundary already. if not, index it explicitly.
if (indexList.isEmpty() || columns.getComparator().compare(indexList.get(indexList.size() - 1).lastName, lastColumn.name()) != 0)
{
IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), lastColumn.name(), startPosition, endPosition - startPosition);
indexList.add(cIndexInfo);
- indexSizeInBytes += cIndexInfo.serializedSize();
}
+ // we should always have at least one computed index block, but we only write it out if there is more than that.
+ assert indexList.size() > 0;
+ return new RowHeader(bf, indexList);
+ }
+
+ private static void writeIndex(RowHeader indexInfo, DataOutput dos) throws IOException
+ {
+ assert indexInfo != null;
+
/* Write out the bloom filter. */
- writeBloomFilter(dos, bf);
+ writeBloomFilter(dos, indexInfo.bloomFilter);
- // write the index. we should always have at least one computed index block, but we only write it out if there is more than that.
- assert indexSizeInBytes > 0;
- if (indexList.size() > 1)
+ dos.writeInt(indexInfo.entriesSize);
+ if (indexInfo.indexEntries.size() > 1)
{
- dos.writeInt(indexSizeInBytes);
- for (IndexHelper.IndexInfo cIndexInfo : indexList)
- {
+ for (IndexHelper.IndexInfo cIndexInfo : indexInfo.indexEntries)
cIndexInfo.serialize(dos);
- }
}
- else
- {
- dos.writeInt(0);
- }
- }
-
- private static void writeEmptyHeader(DataOutput dos, BloomFilter bf)
- throws IOException
- {
- writeBloomFilter(dos, bf);
- dos.writeInt(0);
}
+ /**
+ * Write a Bloom filter into file
+ *
+ * @param dos file to serialize Bloom Filter
+ * @param bf Bloom Filter
+ *
+ * @throws IOException on any I/O error.
+ */
private static void writeBloomFilter(DataOutput dos, BloomFilter bf) throws IOException
{
DataOutputBuffer bufOut = new DataOutputBuffer();
@@ -145,4 +162,36 @@ public class ColumnIndexer
bufOut.flush();
}
+ /**
+ * Holds information about serialized index and bloom filter
+ */
+ public static class RowHeader
+ {
+ public final BloomFilter bloomFilter;
+ public final List<IndexHelper.IndexInfo> indexEntries;
+ public final int entriesSize;
+
+ public RowHeader(BloomFilter bf, List<IndexHelper.IndexInfo> indexes)
+ {
+ assert bf != null;
+ assert indexes != null;
+ bloomFilter = bf;
+ indexEntries = indexes;
+ int entriesSize = 0;
+ if (indexEntries.size() > 1)
+ {
+ for (IndexHelper.IndexInfo info : indexEntries)
+ entriesSize += info.serializedSize();
+ }
+ this.entriesSize = entriesSize;
+ }
+
+ public long serializedSize()
+ {
+ return DBConstants.intSize_ // length of Bloom Filter
+ + bloomFilter.serializedSize() // BF data
+ + DBConstants.intSize_ // length of index block
+ + entriesSize; // index block
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jul 12 23:33:37 2011
@@ -27,19 +27,15 @@ import java.util.HashSet;
import java.util.Set;
import com.google.common.collect.Sets;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.*;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.FileUtils;
@@ -148,23 +144,20 @@ public class SSTableWriter extends SSTab
{
long startPosition = beforeAppend(decoratedKey);
ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile);
- // write placeholder for the row size, since we don't know it yet
- long sizePosition = dataFile.getFilePointer();
- dataFile.writeLong(-1);
- // write out row data
- int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, dataFile);
- // seek back and write the row size (not including the size Long itself)
- long endPosition = dataFile.getFilePointer();
- dataFile.seek(sizePosition);
- long dataSize = endPosition - (sizePosition + 8);
- assert dataSize > 0;
- dataFile.writeLong(dataSize);
- // finally, reset for next row
- dataFile.seek(endPosition);
+
+ // serialize index and bloom filter into in-memory structure
+ ColumnIndexer.RowHeader header = ColumnIndexer.serialize(cf);
+
+ // write out row size
+ dataFile.writeLong(header.serializedSize() + cf.serializedSizeForSSTable());
+
+ // write out row header and data
+ int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, header, dataFile);
afterAppend(decoratedKey, startPosition);
+
// track max column timestamp
sstableMetadataCollector.updateMaxTimestamp(cf.maxTimestamp());
- sstableMetadataCollector.addRowSize(endPosition - startPosition);
+ sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - startPosition);
sstableMetadataCollector.addColumnCount(columnCount);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java Tue Jul 12 23:33:37 2011
@@ -138,4 +138,9 @@ public class BloomFilter extends Filter
{
bitset.clear(0, bitset.size());
}
+
+ public int serializedSize()
+ {
+ return BloomFilterSerializer.serializedSize(this);
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java?rev=1145818&r1=1145817&r2=1145818&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java Tue Jul 12 23:33:37 2011
@@ -25,6 +25,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.cassandra.db.DBConstants;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.utils.obs.OpenBitSet;
@@ -52,6 +53,19 @@ class BloomFilterSerializer implements I
OpenBitSet bs = new OpenBitSet(bits, bitLength);
return new BloomFilter(hashes, bs);
}
-}
-
+ /**
+ * Calculates a serialized size of the given Bloom Filter
+ * @see this.serialize(BloomFilter, DataOutput)
+ *
+ * @param bf Bloom filter to calculate serialized size
+ *
+ * @return serialized size of the given bloom filter
+ */
+ public static int serializedSize(BloomFilter bf)
+ {
+ return DBConstants.intSize_ // hash count
+ + DBConstants.intSize_ // length
+ + bf.bitset.getBits().length * DBConstants.longSize_; // buckets
+ }
+}