You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/05/12 17:20:34 UTC

svn commit: r1102345 - in /cassandra/branches/cassandra-0.8: CHANGES.txt src/java/org/apache/cassandra/db/RowMutation.java src/java/org/apache/cassandra/db/commitlog/CommitLog.java test/unit/org/apache/cassandra/db/RecoveryManagerTest.java

Author: slebresne
Date: Thu May 12 15:20:34 2011
New Revision: 1102345

URL: http://svn.apache.org/viewvc?rev=1102345&view=rev
Log:
Fix counter increment lost after restart
patch by slebresne; reviewed by jbellis for CASSANDRA-2642

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1102345&r1=1102344&r2=1102345&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu May 12 15:20:34 2011
@@ -6,6 +6,7 @@
  * avoid using cached position of a key when GT is requested (CASSANDRA-2633)
  * fix counting bloom filter true positives (CASSANDRA-2637)
  * initialize local ep state prior to gossip startup if needed (CASSANDRA-2638)
+ * fix counter increment lost after restart (CASSANDRA-2642)
 
 
 0.8.0-rc1

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutation.java?rev=1102345&r1=1102344&r2=1102345&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutation.java Thu May 12 15:20:34 2011
@@ -355,7 +355,7 @@ public class RowMutation implements IMut
             }
         }
 
-        public RowMutation deserialize(DataInputStream dis, int version) throws IOException
+        public RowMutation deserialize(DataInputStream dis, int version, boolean fromRemote) throws IOException
         {
             String table = dis.readUTF();
             ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
@@ -364,11 +364,15 @@ public class RowMutation implements IMut
             for (int i = 0; i < size; ++i)
             {
                 Integer cfid = Integer.valueOf(dis.readInt());
-                // This is coming from a remote host
-                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, true, true);
+                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, true, fromRemote);
                 modifications.put(cfid, cf);
             }
             return new RowMutation(table, key, modifications);
         }
+
+        public RowMutation deserialize(DataInputStream dis, int version) throws IOException
+        {
+            return deserialize(dis, version, true);
+        }
     }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1102345&r1=1102344&r2=1102345&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Thu May 12 15:20:34 2011
@@ -268,7 +268,7 @@ public class CommitLog
                     {
                         // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
                         // the current version.  so do make sure the CL is drained prior to upgrading a node.
-                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_);
+                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, false);
                     }
                     catch (UnserializableColumnFamilyException ex)
                     {

Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java?rev=1102345&r1=1102344&r2=1102345&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java (original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java Thu May 12 15:20:34 2011
@@ -26,6 +26,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import org.apache.cassandra.Util;
 import static org.apache.cassandra.Util.column;
@@ -69,4 +70,36 @@ public class RecoveryManagerTest extends
         assertColumns(Util.getColumnFamily(table1, dk, "Standard1"), "col1");
         assertColumns(Util.getColumnFamily(table2, dk, "Standard3"), "col2");
     }
+
+    @Test
+    public void testRecoverCounter() throws IOException, ExecutionException, InterruptedException
+    {
+        Table table1 = Table.open("Keyspace1");
+
+        RowMutation rm;
+        DecoratedKey dk = Util.dk("key");
+        ColumnFamily cf;
+
+        for (int i = 0; i < 10; ++i)
+        {
+            rm = new RowMutation("Keyspace1", dk.key);
+            cf = ColumnFamily.create("Keyspace1", "Counter1");
+            cf.addColumn(new CounterColumn(ByteBufferUtil.bytes("col"), 1L, 1L));
+            rm.add(cf);
+            rm.apply();
+        }
+
+        table1.getColumnFamilyStore("Counter1").clearUnsafe();
+
+        CommitLog.instance.resetUnsafe(); // disassociate segments from live CL
+        CommitLog.recover();
+
+        cf = Util.getColumnFamily(table1, dk, "Counter1");
+
+        assert cf.getColumnCount() == 1;
+        IColumn c = cf.getColumn(ByteBufferUtil.bytes("col"));
+
+        assert c != null;
+        assert ((CounterColumn)c).total() == 10L;
+    }
 }