You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/05 21:40:59 UTC
svn commit: r1067518 - in /cassandra/trunk:
src/java/org/apache/cassandra/io/LazilyCompactedRow.java
src/java/org/apache/cassandra/io/PrecompactedRow.java
test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Author: jbellis
Date: Sat Feb 5 20:40:59 2011
New Revision: 1067518
URL: http://svn.apache.org/viewvc?rev=1067518&view=rev
Log:
make PreCompactedRow and LazyCompactedRow digest computations match
patch by Richard Low and jbellis for CASSANDRA-2039
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1067518&r1=1067517&r2=1067518&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Sat Feb 5 20:40:59 2011
@@ -122,8 +122,19 @@ public class LazilyCompactedRow extends
{
// no special-case for rows.size == 1, we're actually skipping some bytes here so just
// blindly updating everything wouldn't be correct
- digest.update(headerBuffer.getData(), 0, headerBuffer.getLength());
DataOutputBuffer out = new DataOutputBuffer();
+
+ try
+ {
+ ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, out);
+ out.writeInt(columnCount);
+ digest.update(out.getData(), 0, out.getLength());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+
Iterator<IColumn> iter = iterator();
while (iter.hasNext())
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1067518&r1=1067517&r2=1067518&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Sat Feb 5 20:40:59 2011
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ColumnIndexer;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
@@ -47,18 +48,21 @@ public class PrecompactedRow extends Abs
private static Logger logger = LoggerFactory.getLogger(PrecompactedRow.class);
private final DataOutputBuffer buffer;
+ private final DataOutputBuffer headerBuffer;
private int columnCount = 0;
public PrecompactedRow(DecoratedKey key, DataOutputBuffer buffer)
{
super(key);
this.buffer = buffer;
+ this.headerBuffer = new DataOutputBuffer();
}
public PrecompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
{
super(rows.get(0).getKey());
buffer = new DataOutputBuffer();
+ headerBuffer = new DataOutputBuffer();
Set<SSTable> sstables = new HashSet<SSTable>();
for (SSTableIdentityIterator row : rows)
@@ -94,7 +98,9 @@ public class PrecompactedRow extends Abs
ColumnFamily cfPurged = shouldPurge ? ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf;
if (cfPurged == null)
return;
- columnCount = ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
+
+ ColumnIndexer.serialize(cfPurged, headerBuffer);
+ columnCount = ColumnFamily.serializer().serializeForSSTable(cfPurged, buffer);
}
else
{
@@ -113,7 +119,8 @@ public class PrecompactedRow extends Abs
public void write(DataOutput out) throws IOException
{
- out.writeLong(buffer.getLength());
+ out.writeLong(headerBuffer.getLength() + buffer.getLength());
+ out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
out.write(buffer.getData(), 0, buffer.getLength());
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1067518&r1=1067517&r2=1067518&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Sat Feb 5 20:40:59 2011
@@ -25,6 +25,8 @@ import static junit.framework.Assert.ass
import java.io.*;
import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
@@ -51,8 +53,8 @@ public class LazilyCompactedRowTest exte
private void assertBytes(ColumnFamilyStore cfs, int gcBefore, boolean major) throws IOException
{
Collection<SSTableReader> sstables = cfs.getSSTables();
- CompactionIterator ci1 = new CompactionIterator(cfs, sstables, gcBefore, major);
- LazyCompactionIterator ci2 = new LazyCompactionIterator(cfs, sstables, gcBefore, major);
+ CompactionIterator ci1 = new PreCompactingIterator(cfs, sstables, gcBefore, major);
+ CompactionIterator ci2 = new LazyCompactionIterator(cfs, sstables, gcBefore, major);
while (true)
{
@@ -122,9 +124,35 @@ public class LazilyCompactedRowTest exte
assert in2.available() == 0;
}
}
+
+ private void assertDigest(ColumnFamilyStore cfs, int gcBefore, boolean major) throws IOException, NoSuchAlgorithmException
+ {
+ Collection<SSTableReader> sstables = cfs.getSSTables();
+ CompactionIterator ci1 = new PreCompactingIterator(cfs, sstables, gcBefore, major);
+ CompactionIterator ci2 = new LazyCompactionIterator(cfs, sstables, gcBefore, major);
+
+ while (true)
+ {
+ if (!ci1.hasNext())
+ {
+ assert !ci2.hasNext();
+ break;
+ }
+
+ AbstractCompactedRow row1 = ci1.next();
+ AbstractCompactedRow row2 = ci2.next();
+ MessageDigest digest1 = MessageDigest.getInstance("MD5");
+ MessageDigest digest2 = MessageDigest.getInstance("MD5");
+
+ row1.update(digest1);
+ row2.update(digest2);
+
+ assert MessageDigest.isEqual(digest1.digest(), digest2.digest());
+ }
+ }
@Test
- public void testOneRow() throws IOException, ExecutionException, InterruptedException
+ public void testOneRow() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
{
CompactionManager.instance.disableAutoCompaction();
@@ -138,17 +166,18 @@ public class LazilyCompactedRowTest exte
cfs.forceBlockingFlush();
assertBytes(cfs, Integer.MAX_VALUE, true);
+ assertDigest(cfs, Integer.MAX_VALUE, true);
}
@Test
- public void testOneRowTwoColumns() throws IOException, ExecutionException, InterruptedException
+ public void testOneRowTwoColumns() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
{
CompactionManager.instance.disableAutoCompaction();
Table table = Table.open("Keyspace1");
ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
- ByteBuffer key =ByteBuffer.wrap( "k".getBytes() );
+ ByteBuffer key =ByteBuffer.wrap("k".getBytes());
RowMutation rm = new RowMutation("Keyspace1", key);
rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes("c")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes("d")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
@@ -156,10 +185,33 @@ public class LazilyCompactedRowTest exte
cfs.forceBlockingFlush();
assertBytes(cfs, Integer.MAX_VALUE, true);
+ assertDigest(cfs, Integer.MAX_VALUE, true);
}
@Test
- public void testTwoRows() throws IOException, ExecutionException, InterruptedException
+ public void testOneRowManyColumns() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+ ByteBuffer key = ByteBuffer.wrap("k".getBytes());
+ RowMutation rm = new RowMutation("Keyspace1", key);
+ for (int i = 0; i < 1000; i++)
+ rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(i)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+ rm.apply();
+ DataOutputBuffer out = new DataOutputBuffer();
+ RowMutation.serializer().serialize(rm, out);
+ assert out.getLength() > DatabaseDescriptor.getColumnIndexSize();
+ cfs.forceBlockingFlush();
+
+ assertBytes(cfs, Integer.MAX_VALUE, true);
+ assertDigest(cfs, Integer.MAX_VALUE, true);
+ }
+
+ @Test
+ public void testTwoRows() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
{
CompactionManager.instance.disableAutoCompaction();
@@ -176,10 +228,11 @@ public class LazilyCompactedRowTest exte
cfs.forceBlockingFlush();
assertBytes(cfs, Integer.MAX_VALUE, true);
+ assertDigest(cfs, Integer.MAX_VALUE, true);
}
@Test
- public void testTwoRowsTwoColumns() throws IOException, ExecutionException, InterruptedException
+ public void testTwoRowsTwoColumns() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
{
CompactionManager.instance.disableAutoCompaction();
@@ -197,10 +250,11 @@ public class LazilyCompactedRowTest exte
cfs.forceBlockingFlush();
assertBytes(cfs, Integer.MAX_VALUE, true);
+ assertDigest(cfs, Integer.MAX_VALUE, true);
}
@Test
- public void testManyRows() throws IOException, ExecutionException, InterruptedException
+ public void testManyRows() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
{
CompactionManager.instance.disableAutoCompaction();
@@ -219,6 +273,7 @@ public class LazilyCompactedRowTest exte
}
assertBytes(cfs, Integer.MAX_VALUE, true);
+ assertDigest(cfs, Integer.MAX_VALUE, true);
}
private static class LazyCompactionIterator extends CompactionIterator
@@ -237,4 +292,21 @@ public class LazilyCompactedRowTest exte
return new LazilyCompactedRow(cfStore, rows, true, Integer.MAX_VALUE);
}
}
+
+ private static class PreCompactingIterator extends CompactionIterator
+ {
+ private final ColumnFamilyStore cfStore;
+
+ public PreCompactingIterator(ColumnFamilyStore cfStore, Iterable<SSTableReader> sstables, int gcBefore, boolean major) throws IOException
+ {
+ super(cfStore, sstables, gcBefore, major);
+ this.cfStore = cfStore;
+ }
+
+ @Override
+ protected AbstractCompactedRow getCompactedRow()
+ {
+ return new PrecompactedRow(cfStore, rows, true, Integer.MAX_VALUE);
+ }
+ }
}