You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/05 21:40:59 UTC

svn commit: r1067518 - in /cassandra/trunk: src/java/org/apache/cassandra/io/LazilyCompactedRow.java src/java/org/apache/cassandra/io/PrecompactedRow.java test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java

Author: jbellis
Date: Sat Feb  5 20:40:59 2011
New Revision: 1067518

URL: http://svn.apache.org/viewvc?rev=1067518&view=rev
Log:
make PreCompactedRow and LazyCompactedRow digest computations match
patch by Richard Low and jbellis for CASSANDRA-2039

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1067518&r1=1067517&r2=1067518&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Sat Feb  5 20:40:59 2011
@@ -122,8 +122,19 @@ public class LazilyCompactedRow extends 
     {
         // no special-case for rows.size == 1, we're actually skipping some bytes here so just
         // blindly updating everything wouldn't be correct
-        digest.update(headerBuffer.getData(), 0, headerBuffer.getLength());
         DataOutputBuffer out = new DataOutputBuffer();
+
+        try
+        {
+            ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, out);
+            out.writeInt(columnCount);
+            digest.update(out.getData(), 0, out.getLength());
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+
         Iterator<IColumn> iter = iterator();
         while (iter.hasNext())
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1067518&r1=1067517&r2=1067518&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Sat Feb  5 20:40:59 2011
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ColumnIndexer;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
@@ -47,18 +48,21 @@ public class PrecompactedRow extends Abs
     private static Logger logger = LoggerFactory.getLogger(PrecompactedRow.class);
 
     private final DataOutputBuffer buffer;
+    private final DataOutputBuffer headerBuffer;
     private int columnCount = 0;
 
     public PrecompactedRow(DecoratedKey key, DataOutputBuffer buffer)
     {
         super(key);
         this.buffer = buffer;
+        this.headerBuffer = new DataOutputBuffer();
     }
 
     public PrecompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
     {
         super(rows.get(0).getKey());
         buffer = new DataOutputBuffer();
+        headerBuffer = new DataOutputBuffer();
 
         Set<SSTable> sstables = new HashSet<SSTable>();
         for (SSTableIdentityIterator row : rows)
@@ -94,7 +98,9 @@ public class PrecompactedRow extends Abs
             ColumnFamily cfPurged = shouldPurge ? ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf;
             if (cfPurged == null)
                 return;
-            columnCount = ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
+            
+            ColumnIndexer.serialize(cfPurged, headerBuffer);
+            columnCount = ColumnFamily.serializer().serializeForSSTable(cfPurged, buffer);
         }
         else
         {
@@ -113,7 +119,8 @@ public class PrecompactedRow extends Abs
 
     public void write(DataOutput out) throws IOException
     {
-        out.writeLong(buffer.getLength());
+        out.writeLong(headerBuffer.getLength() + buffer.getLength());
+        out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
         out.write(buffer.getData(), 0, buffer.getLength());
     }
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1067518&r1=1067517&r2=1067518&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Sat Feb  5 20:40:59 2011
@@ -25,6 +25,8 @@ import static junit.framework.Assert.ass
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Collection;
 import java.util.concurrent.ExecutionException;
 
@@ -51,8 +53,8 @@ public class LazilyCompactedRowTest exte
     private void assertBytes(ColumnFamilyStore cfs, int gcBefore, boolean major) throws IOException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionIterator ci1 = new CompactionIterator(cfs, sstables, gcBefore, major);
-        LazyCompactionIterator ci2 = new LazyCompactionIterator(cfs, sstables, gcBefore, major);
+        CompactionIterator ci1 = new PreCompactingIterator(cfs, sstables, gcBefore, major);
+        CompactionIterator ci2 = new LazyCompactionIterator(cfs, sstables, gcBefore, major);
 
         while (true)
         {
@@ -122,9 +124,35 @@ public class LazilyCompactedRowTest exte
             assert in2.available() == 0;
         }
     }
+    
+    private void assertDigest(ColumnFamilyStore cfs, int gcBefore, boolean major) throws IOException, NoSuchAlgorithmException
+    {
+        Collection<SSTableReader> sstables = cfs.getSSTables();
+        CompactionIterator ci1 = new PreCompactingIterator(cfs, sstables, gcBefore, major);
+        CompactionIterator ci2 = new LazyCompactionIterator(cfs, sstables, gcBefore, major);
+
+        while (true)
+        {
+            if (!ci1.hasNext())
+            {
+                assert !ci2.hasNext();
+                break;
+            }
+
+            AbstractCompactedRow row1 = ci1.next();
+            AbstractCompactedRow row2 = ci2.next();
+            MessageDigest digest1 = MessageDigest.getInstance("MD5");
+            MessageDigest digest2 = MessageDigest.getInstance("MD5");
+
+            row1.update(digest1);
+            row2.update(digest2);
+
+            assert MessageDigest.isEqual(digest1.digest(), digest2.digest());
+        }
+    }
 
     @Test
-    public void testOneRow() throws IOException, ExecutionException, InterruptedException
+    public void testOneRow() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
     {
         CompactionManager.instance.disableAutoCompaction();
 
@@ -138,17 +166,18 @@ public class LazilyCompactedRowTest exte
         cfs.forceBlockingFlush();
 
         assertBytes(cfs, Integer.MAX_VALUE, true);
+        assertDigest(cfs, Integer.MAX_VALUE, true);
     }
 
     @Test
-    public void testOneRowTwoColumns() throws IOException, ExecutionException, InterruptedException
+    public void testOneRowTwoColumns() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
     {
         CompactionManager.instance.disableAutoCompaction();
 
         Table table = Table.open("Keyspace1");
         ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
 
-        ByteBuffer key =ByteBuffer.wrap( "k".getBytes() );
+        ByteBuffer key =ByteBuffer.wrap("k".getBytes());
         RowMutation rm = new RowMutation("Keyspace1", key);
         rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes("c")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
         rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes("d")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
@@ -156,10 +185,33 @@ public class LazilyCompactedRowTest exte
         cfs.forceBlockingFlush();
 
         assertBytes(cfs, Integer.MAX_VALUE, true);
+        assertDigest(cfs, Integer.MAX_VALUE, true);
     }
 
     @Test
-    public void testTwoRows() throws IOException, ExecutionException, InterruptedException
+    public void testOneRowManyColumns() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+        ByteBuffer key = ByteBuffer.wrap("k".getBytes());
+        RowMutation rm = new RowMutation("Keyspace1", key);
+        for (int i = 0; i < 1000; i++)
+            rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(i)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+        rm.apply();
+        DataOutputBuffer out = new DataOutputBuffer();
+        RowMutation.serializer().serialize(rm, out);
+        assert out.getLength() > DatabaseDescriptor.getColumnIndexSize();
+        cfs.forceBlockingFlush();
+
+        assertBytes(cfs, Integer.MAX_VALUE, true);
+        assertDigest(cfs, Integer.MAX_VALUE, true);
+    }
+
+    @Test
+    public void testTwoRows() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
     {
         CompactionManager.instance.disableAutoCompaction();
 
@@ -176,10 +228,11 @@ public class LazilyCompactedRowTest exte
         cfs.forceBlockingFlush();
 
         assertBytes(cfs, Integer.MAX_VALUE, true);
+        assertDigest(cfs, Integer.MAX_VALUE, true);
     }
 
     @Test
-    public void testTwoRowsTwoColumns() throws IOException, ExecutionException, InterruptedException
+    public void testTwoRowsTwoColumns() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
     {
         CompactionManager.instance.disableAutoCompaction();
 
@@ -197,10 +250,11 @@ public class LazilyCompactedRowTest exte
         cfs.forceBlockingFlush();
 
         assertBytes(cfs, Integer.MAX_VALUE, true);
+        assertDigest(cfs, Integer.MAX_VALUE, true);
     }
 
     @Test
-    public void testManyRows() throws IOException, ExecutionException, InterruptedException
+    public void testManyRows() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException
     {
         CompactionManager.instance.disableAutoCompaction();
 
@@ -219,6 +273,7 @@ public class LazilyCompactedRowTest exte
         }
 
         assertBytes(cfs, Integer.MAX_VALUE, true);
+        assertDigest(cfs, Integer.MAX_VALUE, true);
     }
 
     private static class LazyCompactionIterator extends CompactionIterator
@@ -237,4 +292,21 @@ public class LazilyCompactedRowTest exte
             return new LazilyCompactedRow(cfStore, rows, true, Integer.MAX_VALUE);
         }
     }
+
+    private static class PreCompactingIterator extends CompactionIterator
+    {
+        private final ColumnFamilyStore cfStore;
+
+        public PreCompactingIterator(ColumnFamilyStore cfStore, Iterable<SSTableReader> sstables, int gcBefore, boolean major) throws IOException
+        {
+            super(cfStore, sstables, gcBefore, major);
+            this.cfStore = cfStore;
+        }
+
+        @Override
+        protected AbstractCompactedRow getCompactedRow()
+        {
+            return new PrecompactedRow(cfStore, rows, true, Integer.MAX_VALUE);
+        }
+    }
 }