You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/16 17:27:05 UTC
svn commit: r955267 - in /cassandra/trunk: src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/io/util/ src/java/org/apache/cassandra/tools/
test/unit/org/apache/ca...
Author: jbellis
Date: Wed Jun 16 15:27:05 2010
New Revision: 955267
URL: http://svn.apache.org/viewvc?rev=955267&view=rev
Log:
add LazilyCompactedRow
patch by jbellis; reviewed by Stu Hood for CASSANDRA-16
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/IIterableColumns.java
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Wed Jun 16 15:27:05 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
+import java.util.Arrays;
import java.util.Collection;
import java.security.MessageDigest;
import java.io.IOException;
@@ -213,6 +214,33 @@ public class Column implements IColumn
return rel;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Column column = (Column)o;
+
+ if (clock != null ? !clock.equals(column.clock) : column.clock != null)
+ return false;
+ if (!Arrays.equals(name, column.name))
+ return false;
+
+ return Arrays.equals(value, column.value);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = name != null ? Arrays.hashCode(name) : 0;
+ result = 31 * result + (value != null ? Arrays.hashCode(value) : 0);
+ result = 31 * result + (clock != null ? clock.hashCode() : 0);
+ return result;
+ }
+
public String getString(AbstractType comparator)
{
StringBuilder sb = new StringBuilder();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Wed Jun 16 15:27:05 2010
@@ -33,9 +33,10 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.IIterableColumns;
import org.apache.cassandra.utils.FBUtilities;
-public class ColumnFamily implements IColumnContainer
+public class ColumnFamily implements IColumnContainer, IIterableColumns
{
/* The column serializer for this Column Family. Create based on config. */
private static ColumnFamilySerializer serializer = new ColumnFamilySerializer();
@@ -442,4 +443,14 @@ public class ColumnFamily implements ICo
return;
addAll(cf);
}
+
+ public int getEstimatedColumnCount()
+ {
+ return getColumnCount();
+ }
+
+ public Iterator<IColumn> iterator()
+ {
+ return columns.values().iterator();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Wed Jun 16 15:27:05 2010
@@ -73,9 +73,7 @@ public class ColumnFamilySerializer impl
{
try
{
- dos.writeInt(columnFamily.localDeletionTime.get());
- IClock _markedForDeleteAt = columnFamily.markedForDeleteAt.get();
- columnFamily.getClockType().serializer().serialize(_markedForDeleteAt, dos);
+ serializeCFInfo(columnFamily, dos);
Collection<IColumn> columns = columnFamily.getSortedColumns();
dos.writeInt(columns.size());
@@ -90,6 +88,13 @@ public class ColumnFamilySerializer impl
}
}
+ public void serializeCFInfo(ColumnFamily columnFamily, DataOutput dos) throws IOException
+ {
+ dos.writeInt(columnFamily.localDeletionTime.get());
+ IClock _markedForDeleteAt = columnFamily.markedForDeleteAt.get();
+ columnFamily.getClockType().serializer().serialize(_markedForDeleteAt, dos);
+ }
+
public void serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos)
{
ColumnIndexer.serialize(columnFamily, dos);
@@ -108,7 +113,7 @@ public class ColumnFamilySerializer impl
return cf;
}
- private void deserializeColumns(DataInput dis, ColumnFamily cf) throws IOException
+ public void deserializeColumns(DataInput dis, ColumnFamily cf) throws IOException
{
int size = dis.readInt();
for (int i = 0; i < size; ++i)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Wed Jun 16 15:27:05 2010
@@ -22,12 +22,12 @@ import java.io.IOError;
import java.io.IOException;
import java.io.DataOutput;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.util.IIterableColumns;
import org.apache.cassandra.utils.BloomFilter;
@@ -39,15 +39,15 @@ public class ColumnIndexer
/**
* Given a column family this, function creates an in-memory structure that represents the
* column index for the column family, and subsequently writes it to disk.
- * @param columnFamily Column family to create index for
+ * @param columns Column family to create index for
* @param dos data output stream
* @throws IOException
*/
- public static void serialize(ColumnFamily columnFamily, DataOutput dos)
+ public static void serialize(IIterableColumns columns, DataOutput dos)
{
try
{
- serializeInternal(columnFamily, dos);
+ serializeInternal(columns, dos);
}
catch (IOException e)
{
@@ -55,19 +55,15 @@ public class ColumnIndexer
}
}
- public static void serializeInternal(ColumnFamily columnFamily, DataOutput dos) throws IOException
+ public static void serializeInternal(IIterableColumns columns, DataOutput dos) throws IOException
{
- Collection<IColumn> columns = columnFamily.getSortedColumns();
- int columnCount = columns.size();
+ int columnCount = columns.getEstimatedColumnCount();
BloomFilter bf = BloomFilter.getFilter(columnCount, 4);
- if (columns.isEmpty())
+ if (columnCount == 0)
{
- // write empty bloom filter and index
- writeBloomFilter(dos, bf);
- dos.writeInt(0);
-
+ writeEmptyHeader(dos, bf);
return;
}
@@ -98,8 +94,16 @@ public class ColumnIndexer
lastColumn = column;
}
+
+ // all columns were GC'd after all
+ if (lastColumn == null)
+ {
+ writeEmptyHeader(dos, bf);
+ return;
+ }
+
// the last column may have fallen on an index boundary already. if not, index it explicitly.
- if (indexList.isEmpty() || columnFamily.getComparator().compare(indexList.get(indexList.size() - 1).lastName, lastColumn.name()) != 0)
+ if (indexList.isEmpty() || columns.getComparator().compare(indexList.get(indexList.size() - 1).lastName, lastColumn.name()) != 0)
{
IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), lastColumn.name(), startPosition, endPosition - startPosition);
indexList.add(cIndexInfo);
@@ -118,6 +122,13 @@ public class ColumnIndexer
}
}
+ private static void writeEmptyHeader(DataOutput dos, BloomFilter bf)
+ throws IOException
+ {
+ writeBloomFilter(dos, bf);
+ dos.writeInt(0);
+ }
+
private static void writeBloomFilter(DataOutput dos, BloomFilter bf) throws IOException
{
DataOutputBuffer bufOut = new DataOutputBuffer();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java Wed Jun 16 15:27:05 2010
@@ -102,6 +102,18 @@ public class TimestampClock implements I
{
return Long.toString(timestamp);
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return o instanceof TimestampClock && compare((TimestampClock)o) == ClockRelationship.EQUAL;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return (int)(timestamp ^ (timestamp >>> 32));
+ }
}
class TimestampClockSerializer implements ICompactSerializer2<IClock>
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Wed Jun 16 15:27:05 2010
@@ -48,7 +48,7 @@ public class CompactionIterator extends
protected static final int FILE_BUFFER_SIZE = 1024 * 1024;
- private final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+ protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
private final int gcBefore;
private final boolean major;
@@ -103,7 +103,7 @@ public class CompactionIterator extends
try
{
- PrecompactedRow compactedRow = new PrecompactedRow(rows, major, gcBefore);
+ AbstractCompactedRow compactedRow = getCompactedRow();
return compactedRow.isEmpty() ? null : compactedRow;
}
finally
@@ -120,6 +120,19 @@ public class CompactionIterator extends
}
}
+ protected AbstractCompactedRow getCompactedRow()
+ {
+ long rowSize = 0;
+ for (SSTableIdentityIterator row : rows)
+ {
+ rowSize += row.getDataSize();
+ }
+ if (rowSize > 512 * 1024 * 1024)
+ return new LazilyCompactedRow(rows, major, gcBefore);
+ else
+ return new PrecompactedRow(rows, major, gcBefore);
+ }
+
public void close() throws IOException
{
for (SSTableScanner scanner : getScanners())
Added: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=955267&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Wed Jun 16 15:27:05 2010
@@ -0,0 +1,192 @@
+package org.apache.cassandra.io;
+
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.*;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterators;
+import org.apache.commons.collections.iterators.CollatingIterator;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.IIterableColumns;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ReducingIterator;
+
+/**
+ * LazilyCompactedRow only computes the row bloom filter and column index in memory
+ * (at construction time); it does this by reading one column at a time from each
+ * of the rows being compacted, and merging them as it does so. So the most we have
+ * in memory at a time is the bloom filter, the index, and one column from each
+ * pre-compaction row.
+ *
+ * When write() or update() is called, a second pass is made over the pre-compaction
+ * rows to write the merged columns or update the hash, again with at most one column
+ * from each row deserialized at a time.
+ */
+public class LazilyCompactedRow extends AbstractCompactedRow implements IIterableColumns
+{
+ private final List<SSTableIdentityIterator> rows;
+ private final boolean major;
+ private final int gcBefore;
+ private final DataOutputBuffer headerBuffer;
+ private ColumnFamily emptyColumnFamily;
+ private LazyColumnIterator iter;
+ private int columnCount;
+ private long columnSerializedSize;
+
+ public LazilyCompactedRow(List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
+ {
+ super(rows.get(0).getKey());
+ this.major = major;
+ this.gcBefore = gcBefore;
+ this.rows = new ArrayList<SSTableIdentityIterator>(rows);
+
+ for (SSTableIdentityIterator row : rows)
+ {
+ ColumnFamily cf = row.getColumnFamily();
+
+ if (emptyColumnFamily == null)
+ emptyColumnFamily = cf;
+ else
+ emptyColumnFamily.delete(cf);
+ }
+
+ // initialize row header so isEmpty can be called
+ headerBuffer = new DataOutputBuffer();
+ ColumnIndexer.serialize(this, headerBuffer);
+ // reach into iterator used by ColumnIndexer to get column count and size
+ columnCount = iter.size;
+ columnSerializedSize = iter.serializedSize;
+ iter = null;
+ }
+
+ public void write(DataOutput out) throws IOException
+ {
+ if (rows.size() == 1 && !major)
+ {
+ SSTableIdentityIterator row = rows.get(0);
+ out.writeInt(row.getDataSize());
+ row.echoData(out);
+ return;
+ }
+
+ DataOutputBuffer clockOut = new DataOutputBuffer();
+ ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
+
+ out.writeInt((int)(headerBuffer.getLength() + clockOut.getLength() + columnSerializedSize));
+ out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
+ out.write(clockOut.getData(), 0, clockOut.getLength());
+ out.writeInt(columnCount);
+
+ Iterator<IColumn> iter = iterator();
+ while (iter.hasNext())
+ {
+ IColumn column = iter.next();
+ emptyColumnFamily.getColumnSerializer().serialize(column, out);
+ }
+ }
+
+ public void update(MessageDigest digest)
+ {
+ // no special-case for rows.size == 1, we're actually skipping some bytes here so just
+ // blindly updating everything wouldn't be correct
+ digest.update(headerBuffer.getData(), 0, headerBuffer.getLength());
+ DataOutputBuffer out = new DataOutputBuffer();
+ Iterator<IColumn> iter = iterator();
+ while (iter.hasNext())
+ {
+ IColumn column = iter.next();
+ out.reset();
+ try
+ {
+ emptyColumnFamily.getColumnSerializer().serialize(column, out);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ digest.update(out.getData(), 0, out.getLength());
+ }
+ }
+
+ public boolean isEmpty()
+ {
+ boolean cfIrrelevant = ColumnFamilyStore.removeDeleted(emptyColumnFamily, gcBefore) == null;
+ return cfIrrelevant && columnCount == 0;
+ }
+
+ public int getEstimatedColumnCount()
+ {
+ int n = 0;
+ for (SSTableIdentityIterator row : rows)
+ n += row.getColumnCount();
+ return n;
+ }
+
+ public AbstractType getComparator()
+ {
+ return emptyColumnFamily.getComparator();
+ }
+
+ public Iterator<IColumn> iterator()
+ {
+ for (SSTableIdentityIterator row : rows)
+ {
+ row.reset();
+ }
+ Comparator<IColumn> nameComparator = new Comparator<IColumn>()
+ {
+ public int compare(IColumn o1, IColumn o2)
+ {
+ return getComparator().compare(o1.name(), o2.name());
+ }
+ };
+ iter = new LazyColumnIterator(new CollatingIterator(nameComparator, rows));
+ return Iterators.filter(iter, Predicates.notNull());
+ }
+
+ private class LazyColumnIterator extends ReducingIterator<IColumn, IColumn>
+ {
+ ColumnFamily container = emptyColumnFamily.cloneMeShallow();
+ long serializedSize = 4; // int for column count
+ int size = 0;
+
+ public LazyColumnIterator(Iterator<IColumn> source)
+ {
+ super(source);
+ }
+
+ @Override
+ protected boolean isEqual(IColumn o1, IColumn o2)
+ {
+ return Arrays.equals(o1.name(), o2.name());
+ }
+
+ public void reduce(IColumn current)
+ {
+ container.addColumn(current);
+ }
+
+ protected IColumn getReduced()
+ {
+ assert container != null;
+ IColumn reduced = container.iterator().next();
+ ColumnFamily purged = major ? ColumnFamilyStore.removeDeleted(container, gcBefore) : container;
+ if (purged == null || !purged.iterator().hasNext())
+ {
+ container.clear();
+ return null;
+ }
+ container.clear();
+ serializedSize += reduced.serializedSize();
+ size++;
+ return reduced;
+ }
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Wed Jun 16 15:27:05 2010
@@ -42,7 +42,7 @@ public class PrecompactedRow extends Abs
ColumnFamily thisCF;
try
{
- thisCF = row.getColumnFamily();
+ thisCF = row.getColumnFamilyWithColumns();
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Wed Jun 16 15:27:05 2010
@@ -86,7 +86,7 @@ public class IndexHelper
* @return bloom filter summarizing the column information
* @throws java.io.IOException
*/
- public static BloomFilter defreezeBloomFilter(FileDataInput file) throws IOException
+ public static BloomFilter defreezeBloomFilter(DataInput file) throws IOException
{
int size = file.readInt();
byte[] bytes = new byte[size];
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Jun 16 15:27:05 2010
@@ -22,6 +22,7 @@ package org.apache.cassandra.io.sstable;
import java.io.*;
+import java.util.ArrayList;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
@@ -29,15 +30,14 @@ import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.filter.IColumnIterator;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import com.google.common.collect.AbstractIterator;
-
-public class SSTableIdentityIterator extends AbstractIterator<IColumn> implements Comparable<SSTableIdentityIterator>, IColumnIterator
+public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
{
private final DecoratedKey key;
private final long finishedAt;
private final BufferedRandomAccessFile file;
private SSTableReader sstable;
private long dataStart;
+ private final int dataSize;
/**
* Used to iterate through the columns of a row.
@@ -45,17 +45,18 @@ public class SSTableIdentityIterator ext
* @param file Reading using this file.
* @param key Key of this row.
* @param dataStart Data for this row starts at this pos.
- * @param finishedAt Data for this row ends at this pos.
+ * @param dataSize length of row data
* @throws IOException
*/
- public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long finishedAt)
+ public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, int dataSize)
throws IOException
{
this.sstable = sstable;
this.file = file;
this.key = key;
this.dataStart = dataStart;
- this.finishedAt = finishedAt;
+ this.dataSize = dataSize;
+ finishedAt = dataStart + dataSize;
}
public DecoratedKey getKey()
@@ -68,6 +69,11 @@ public class SSTableIdentityIterator ext
return file.getPath();
}
+ public int getDataSize()
+ {
+ return dataSize;
+ }
+
public void echoData(DataOutput out) throws IOException
{
file.seek(dataStart);
@@ -77,38 +83,77 @@ public class SSTableIdentityIterator ext
}
}
- // TODO r/m this and make compaction merge columns iteratively for CASSSANDRA-16
- public ColumnFamily getColumnFamily() throws IOException
+ public ColumnFamily getColumnFamily()
{
- file.seek(dataStart);
- IndexHelper.skipBloomFilter(file);
- IndexHelper.skipIndex(file);
- return ColumnFamily.serializer().deserializeFromSSTable(sstable, file);
+ ColumnFamily cf;
+ try
+ {
+ file.seek(dataStart);
+ IndexHelper.skipBloomFilter(file);
+ IndexHelper.skipIndex(file);
+ cf = sstable.makeColumnFamily();
+ ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, file);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ return cf;
}
- protected IColumn computeNext()
+ public int getColumnCount()
{
+ getColumnFamily(); // skips to column count
try
{
- assert file.getFilePointer() <= finishedAt;
- if (file.getFilePointer() == finishedAt)
- {
- return endOfData();
- }
+ return file.readInt();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ public ColumnFamily getColumnFamilyWithColumns() throws IOException
+ {
+ ColumnFamily cf = getColumnFamily();
+ ColumnFamily.serializer().deserializeColumns(file, cf);
+ return cf;
+ }
+
+ public boolean hasNext()
+ {
+ return file.getFilePointer() < finishedAt;
+ }
+
+ public IColumn next()
+ {
+ try
+ {
return sstable.getColumnSerializer().deserialize(file);
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new IOError(e);
}
}
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
public int compareTo(SSTableIdentityIterator o)
{
return key.compareTo(o.key);
}
+ public void reset()
+ {
+ getColumnCount();
+ }
+
public void close() throws IOException
{
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Wed Jun 16 15:27:05 2010
@@ -175,7 +175,9 @@ public class SSTableScanner implements I
if (filter == null)
{
- return row = new SSTableIdentityIterator(sstable, file, key, dataStart, finishedAt);
+ row = new SSTableIdentityIterator(sstable, file, key, dataStart, dataSize);
+ ((SSTableIdentityIterator)row).reset();
+ return row;
}
else
{
Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/IIterableColumns.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/IIterableColumns.java?rev=955267&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/IIterableColumns.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/IIterableColumns.java Wed Jun 16 15:27:05 2010
@@ -0,0 +1,13 @@
+package org.apache.cassandra.io.util;
+
+import java.util.Comparator;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public interface IIterableColumns extends Iterable<IColumn>
+{
+ public int getEstimatedColumnCount();
+
+ AbstractType getComparator();
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Wed Jun 16 15:27:05 2010
@@ -112,7 +112,7 @@ public class SSTableExport
private static String serializeRow(SSTableIdentityIterator row) throws IOException
{
- ColumnFamily cf = row.getColumnFamily();
+ ColumnFamily cf = row.getColumnFamilyWithColumns();
AbstractType comparator = cf.getComparator();
StringBuilder json = new StringBuilder(asKey(bytesToHex(row.getKey().key)));
@@ -265,8 +265,8 @@ public class SSTableExport
excludeSet = new HashSet<String>(Arrays.asList(excludes));
outs.println("{");
-
- while(scanner.hasNext())
+
+ while (scanner.hasNext())
{
SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
if (excludeSet.contains(bytesToHex(row.getKey().key)))
Added: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=955267&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Wed Jun 16 15:27:05 2010
@@ -0,0 +1,207 @@
+package org.apache.cassandra.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.CollatingOrderPreservingPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static junit.framework.Assert.assertEquals;
+
+
+public class LazilyCompactedRowTest extends CleanupHelper
+{
+ private void assertBytes(Collection<SSTableReader> sstables, int gcBefore, boolean major) throws IOException
+ {
+ CompactionIterator ci1 = new CompactionIterator(sstables, gcBefore, major);
+ LazyCompactionIterator ci2 = new LazyCompactionIterator(sstables, gcBefore, major);
+
+ while (true)
+ {
+ if (!ci1.hasNext())
+ {
+ assert !ci2.hasNext();
+ break;
+ }
+
+ AbstractCompactedRow row1 = ci1.next();
+ AbstractCompactedRow row2 = ci2.next();
+ DataOutputBuffer out1 = new DataOutputBuffer();
+ DataOutputBuffer out2 = new DataOutputBuffer();
+ row1.write(out1);
+ row2.write(out2);
+ DataInputStream in1 = new DataInputStream(new ByteArrayInputStream(out1.getData(), 0, out1.getLength()));
+ DataInputStream in2 = new DataInputStream(new ByteArrayInputStream(out2.getData(), 0, out2.getLength()));
+
+ // key isn't part of what CompactedRow writes, that's done by SSTW.append
+
+ // row size can differ b/c of bloom filter counts being different
+ int rowSize1 = in1.readInt();
+ int rowSize2 = in2.readInt();
+ assertEquals(out1.getLength(), rowSize1 + 4);
+ assertEquals(out2.getLength(), rowSize2 + 4);
+ // bloom filter
+ IndexHelper.defreezeBloomFilter(in1);
+ IndexHelper.defreezeBloomFilter(in2);
+ // index
+ int indexSize1 = in1.readInt();
+ int indexSize2 = in2.readInt();
+ assertEquals(indexSize1, indexSize2);
+ byte[] bytes1 = new byte[indexSize1];
+ byte[] bytes2 = new byte[indexSize2];
+ in1.readFully(bytes1);
+ in2.readFully(bytes2);
+ assert Arrays.equals(bytes1, bytes2);
+ // cf metadata
+ ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+ ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+ ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf1, in1);
+ ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf2, in2);
+ assert cf1.getLocalDeletionTime() == cf2.getLocalDeletionTime();
+ assert cf1.getMarkedForDeleteAt().equals(cf2.getMarkedForDeleteAt());
+ // columns
+ int columns = in1.readInt();
+ assert columns == in2.readInt();
+ for (int i = 0; i < columns; i++)
+ {
+ IColumn c1 = cf1.getColumnSerializer().deserialize(in1);
+ IColumn c2 = cf2.getColumnSerializer().deserialize(in2);
+ assert c1.equals(c2);
+ }
+ // that should be everything
+ assert in1.available() == 0;
+ assert in2.available() == 0;
+ }
+ }
+
+ @Test
+ public void testOneRow() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+ byte[] key = "k".getBytes();
+ RowMutation rm = new RowMutation("Keyspace1", key);
+ rm.add(new QueryPath("Standard1", null, "c".getBytes()), new byte[0], new TimestampClock(0));
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+ }
+
+ @Test
+ public void testOneRowTwoColumns() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+ byte[] key = "k".getBytes();
+ RowMutation rm = new RowMutation("Keyspace1", key);
+ rm.add(new QueryPath("Standard1", null, "c".getBytes()), new byte[0], new TimestampClock(0));
+ rm.add(new QueryPath("Standard1", null, "d".getBytes()), new byte[0], new TimestampClock(0));
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+ }
+
+ @Test
+ public void testTwoRows() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+ byte[] key = "k".getBytes();
+ RowMutation rm = new RowMutation("Keyspace1", key);
+ rm.add(new QueryPath("Standard1", null, "c".getBytes()), new byte[0], new TimestampClock(0));
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+ }
+
+ @Test
+ public void testTwoRowsTwoColumns() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+ byte[] key = "k".getBytes();
+ RowMutation rm = new RowMutation("Keyspace1", key);
+ rm.add(new QueryPath("Standard1", null, "c".getBytes()), new byte[0], new TimestampClock(0));
+ rm.add(new QueryPath("Standard1", null, "d".getBytes()), new byte[0], new TimestampClock(0));
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+ }
+
+ @Test
+ public void testManyRows() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+ final int ROWS_PER_SSTABLE = 10;
+ for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
+ for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+ byte[] key = String.valueOf(i % 2).getBytes();
+ RowMutation rm = new RowMutation("Keyspace1", key);
+ rm.add(new QueryPath("Standard1", null, String.valueOf(i / 2).getBytes()), new byte[0], new TimestampClock(j * ROWS_PER_SSTABLE + i));
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+ }
+
+ private static class LazyCompactionIterator extends CompactionIterator
+ {
+ public LazyCompactionIterator(Iterable<SSTableReader> sstables, int gcBefore, boolean major) throws IOException
+ {
+ super(sstables, gcBefore, major);
+ }
+
+ @Override
+ protected AbstractCompactedRow getCompactedRow()
+ {
+ return new LazilyCompactedRow(rows, true, Integer.MAX_VALUE);
+ }
+ }
+}