You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/11/23 08:59:46 UTC

svn commit: r1205316 - in /cassandra/branches/cassandra-0.8: CHANGES.txt src/java/org/apache/cassandra/db/context/CounterContext.java test/unit/org/apache/cassandra/db/context/CounterContextTest.java

Author: slebresne
Date: Wed Nov 23 07:59:45 2011
New Revision: 1205316

URL: http://svn.apache.org/viewvc?rev=1205316&view=rev
Log:
Fix array out of bounds error in counter shard removal
patch by slebresne; reviewed by yukim for CASSANDRA-3514

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1205316&r1=1205315&r2=1205316&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Nov 23 07:59:45 2011
@@ -36,7 +36,7 @@
  * CFMetaData.convertToThrift method to set RowCacheProvider (CASSANDRA-3405)
  * acquire compactionlock during truncate (CASSANDRA-3399)
  * fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415)
- * Make counter shard merging thread safe (CASSANDRA-3178)
+ * Make counter shard merging thread safe (CASSANDRA-3178, -3514)
  * Fix bug preventing the use of efficient cross-DC writes (CASSANDRA-3472)
  * (Hadoop) skip empty rows when entire row is requested, redux (CASSANDRA-2855)
  * fix concurrence issue in the FailureDetector (CASSANDRA-3519)

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1205316&r1=1205315&r2=1205316&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java Wed Nov 23 07:59:45 2011
@@ -628,6 +628,7 @@ public class CounterContext implements I
         int hlength = headerLength(context);
         ContextState state = new ContextState(context, hlength);
         int removedShards = 0;
+        int removedDelta = 0;
         while (state.hasRemaining())
         {
             long clock = state.getClock();
@@ -655,7 +656,11 @@ public class CounterContext implements I
                 }
 
                 if (-((int)(clock / 1000)) < gcBefore)
+                {
                     removedShards++;
+                    if (state.isDelta())
+                        removedDelta++;
+                }
             }
             state.moveToNext();
         }
@@ -663,9 +668,9 @@ public class CounterContext implements I
         if (removedShards == 0)
             return context;
 
-
-        int removedHeaderSize = removedShards * HEADER_ELT_LENGTH;
-        int newSize = context.remaining() - removedHeaderSize - (removedShards * STEP_LENGTH);
+        int removedHeaderSize = removedDelta * HEADER_ELT_LENGTH;
+        int removedBodySize = removedShards * STEP_LENGTH;
+        int newSize = context.remaining() - removedHeaderSize - removedBodySize;
         int newHlength = hlength - removedHeaderSize;
         ByteBuffer cleanedContext = ByteBuffer.allocate(newSize);
         cleanedContext.putShort(cleanedContext.position(), (short) ((newHlength - HEADER_SIZE_LENGTH) / HEADER_ELT_LENGTH));
@@ -675,10 +680,11 @@ public class CounterContext implements I
         while (state.hasRemaining())
         {
             long clock = state.getClock();
-            if (!(clock < 0 && state.getCount() == 0))
+            if (clock >= 0 || state.getCount() != 0 || -((int)(clock / 1000)) >= gcBefore)
             {
                 state.copyTo(cleaned);
             }
+
             state.moveToNext();
         }
         return cleanedContext;

Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java?rev=1205316&r1=1205315&r2=1205316&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java (original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java Wed Nov 23 07:59:45 2011
@@ -381,4 +381,81 @@ public class CounterContextTest
         assert cc.total(ctx.context) == cc.total(cleaned);
         assert cleaned.remaining() == ctx.context.remaining() - stepLength - 2;
     }
+
+    @Test
+    public void testRemoveOldShardsNotAllExpiring()
+    {
+        runRemoveOldShardsNotAllExpiring(HeapAllocator.instance);
+        runRemoveOldShardsNotAllExpiring(bumpedSlab());
+    }
+
+    private void runRemoveOldShardsNotAllExpiring(Allocator allocator)
+    {
+        NodeId id1 = NodeId.fromInt(1);
+        NodeId id3 = NodeId.fromInt(3);
+        NodeId id6 = NodeId.fromInt(6);
+        List<NodeId.NodeIdRecord> records = new ArrayList<NodeId.NodeIdRecord>();
+        records.add(new NodeId.NodeIdRecord(id1, 2L));
+        records.add(new NodeId.NodeIdRecord(id3, 4L));
+        records.add(new NodeId.NodeIdRecord(id6, 10L));
+
+        ContextState ctx = ContextState.allocate(6, 3, allocator);
+        ctx.writeElement(id1, 0L, 1L, true);
+        ctx.writeElement(NodeId.fromInt(2), 0L, 2L);
+        ctx.writeElement(id3, 0L, 3L, true);
+        ctx.writeElement(NodeId.fromInt(4), 0L, 3L);
+        ctx.writeElement(NodeId.fromInt(5), 0L, 3L, true);
+        ctx.writeElement(id6, 0L, 6L);
+
+        int timeFirstMerge = (int)(System.currentTimeMillis() / 1000);
+
+        // First, only merge the first id
+        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, 3L);
+        ByteBuffer merged = cc.merge(ctx.context, merger, allocator);
+        assert cc.total(ctx.context) == cc.total(merged);
+
+        try
+        {
+            Thread.sleep(2000);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError();
+        }
+
+        // merge the second one
+        ByteBuffer merger2 = cc.computeOldShardMerger(merged, records, 7L);
+        ByteBuffer merged2 = cc.merge(merged, merger2, allocator);
+        assert cc.total(ctx.context) == cc.total(merged2);
+
+        ByteBuffer cleaned = cc.removeOldShards(merged2, timeFirstMerge + 1);
+        assert cc.total(ctx.context) == cc.total(cleaned);
+        assert cleaned.remaining() == ctx.context.remaining();
+
+        // We should have cleaned id1 but not id3
+        ContextState m = new ContextState(cleaned);
+        m.moveToNext();
+        assert m.getNodeId().equals(id3);
+
+    }
+
+    @Test
+    public void testRemoveNotDeltaOldShards()
+    {
+        runRemoveNotDeltaOldShards(HeapAllocator.instance);
+        runRemoveNotDeltaOldShards(bumpedSlab());
+    }
+
+    private void runRemoveNotDeltaOldShards(Allocator allocator)
+    {
+        ContextState ctx = ContextState.allocate(4, 1, allocator);
+        ctx.writeElement(NodeId.fromInt(1), 1L, 1L, true);
+        ctx.writeElement(NodeId.fromInt(2), -System.currentTimeMillis(), 0L);
+        ctx.writeElement(NodeId.fromInt(3), -System.currentTimeMillis(), 0L);
+        ctx.writeElement(NodeId.fromInt(4), -System.currentTimeMillis(), 0L);
+
+        ByteBuffer cleaned = cc.removeOldShards(ctx.context, (int)(System.currentTimeMillis() / 1000) + 1);
+        assert cc.total(ctx.context) == cc.total(cleaned);
+        assert cleaned.remaining() == ctx.context.remaining() - 3 * stepLength;
+    }
 }