You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/11/07 14:36:29 UTC

svn commit: r1198732 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cass...

Author: slebresne
Date: Mon Nov  7 13:36:28 2011
New Revision: 1198732

URL: http://svn.apache.org/viewvc?rev=1198732&view=rev
Log:
merge from 1.0.

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 13:36:28 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1196956
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
 /cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Nov  7 13:36:28 2011
@@ -3,6 +3,8 @@
  * EACH_QUORUM is only supported for writes (CASSANDRA-3272)
  * replace compactionlock use in schema migration by checking CFS.isInvalidD
  * recognize that "SELECT first ... *" isn't really "SELECT *" (CASSANDRA-3445)
+Merged from 0.8:
+ * Make counter shard merging thread safe (CASSANDRA-3178)
 
 1.0.2
  * "defragment" rows for name-based queries under STCS (CASSANDRA-2503)

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 13:36:28 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1196956
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/contrib:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
 /cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 13:36:28 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1196956
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 13:36:28 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1196956
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 13:36:28 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1196956
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 13:36:28 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1196956
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 13:36:28 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1196956
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Mon Nov  7 13:36:28 2011
@@ -119,6 +119,8 @@ public final class CFMetaData
     private int rowCacheSavePeriodInSeconds;          // default 0 (off)
     private int keyCacheSavePeriodInSeconds;          // default 3600 (1 hour)
     private int rowCacheKeysToSave;                   // default max int (aka feature is off)
+    // mergeShardsChance is now obsolete, but left here so as to not break
+    // thrift compatibility
     private double mergeShardsChance;                 // default 0.1, chance [0.0, 1.0] of merging old shards during replication
     private IRowCacheProvider rowCacheProvider;
     private ByteBuffer keyAlias;                      // default NULL

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Nov  7 13:36:28 2011
@@ -1904,4 +1904,16 @@ public class ColumnFamilyStore implement
             this.memtables = memtables;
         }
     }
+
+    /**
+     * Returns the creation time of the oldest memtable not fully flushed yet.
+     */
+    public long oldestUnflushedMemtable()
+    {
+        DataTracker.View view = data.getView();
+        long oldest = view.memtable.creationTime();
+        for (Memtable memtable : view.memtablesPendingFlush)
+            oldest = Math.min(oldest, memtable.creationTime());
+        return oldest;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java Mon Nov  7 13:36:28 2011
@@ -20,20 +20,29 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.net.InetAddress;
 import java.security.MessageDigest;
+import java.util.concurrent.TimeoutException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Collection;
 
+import com.google.common.collect.Multimap;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.context.IContext.ContextRelationship;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
-import org.apache.cassandra.utils.NodeId;
+import org.apache.cassandra.service.IWriteResponseHandler;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.*;
 
 /**
  * A column that represents a partitioned counter.
@@ -236,9 +245,9 @@ public class CounterColumn extends Colum
         return contextManager.hasNodeId(value(), id);
     }
 
-    public CounterColumn computeOldShardMerger()
+    private CounterColumn computeOldShardMerger(int mergeBefore)
     {
-        ByteBuffer bb = contextManager.computeOldShardMerger(value(), NodeId.getOldLocalNodeIds());
+        ByteBuffer bb = contextManager.computeOldShardMerger(value(), NodeId.getOldLocalNodeIds(), mergeBefore);
         if (bb == null)
             return null;
         else
@@ -256,17 +265,48 @@ public class CounterColumn extends Colum
         }
     }
 
-    public static void removeOldShards(ColumnFamily cf, int gcBefore)
+    public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore)
     {
+        mergeAndRemoveOldShards(key, cf, gcBefore, mergeBefore, true);
+    }
+
+    /**
+     * There is two phase to the removal of old shards.
+     * First phase: we merge the old shard value to the current shard and
+     * 'nulify' the old one. We then send the counter context with the old
+     * shard nulified to all other replica.
+     * Second phase: once an old shard has been nulified for longer than
+     * gc_grace (to be sure all other replica had been aware of the merge), we
+     * simply remove that old shard from the context (it's value is 0).
+     * This method does both phases.
+     * (Note that the sendToOtherReplica flag is here only to facilitate
+     * testing. It should be true in real code so use the method above
+     * preferably)
+     */
+    public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore, boolean sendToOtherReplica)
+    {
+        ColumnFamily remoteMerger = null;
         if (!cf.isSuper())
         {
             for (IColumn c : cf)
             {
                 if (!(c instanceof CounterColumn))
                     continue;
-                CounterColumn cleaned = ((CounterColumn) c).removeOldShards(gcBefore);
-                if (cleaned != c)
-                    cf.replace(c, cleaned);
+                CounterColumn cc = (CounterColumn) c;
+                CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
+                CounterColumn merged = cc;
+                if (shardMerger != null)
+                {
+                    merged = (CounterColumn) cc.reconcile(shardMerger);
+                    if (remoteMerger == null)
+                        remoteMerger = cf.cloneMeShallow();
+                    remoteMerger.addColumn(merged);
+                }
+                CounterColumn cleaned = merged.removeOldShards(gcBefore);
+                if (cleaned != cc)
+                {
+                    cf.replace(cc, cleaned);
+                }
             }
         }
         else
@@ -278,16 +318,61 @@ public class CounterColumn extends Colum
                 {
                     if (!(subColumn instanceof CounterColumn))
                         continue;
-                    CounterColumn cleaned = ((CounterColumn) subColumn).removeOldShards(gcBefore);
+                    CounterColumn cc = (CounterColumn) subColumn;
+                    CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
+                    CounterColumn merged = cc;
+                    if (shardMerger != null)
+                    {
+                        merged = (CounterColumn) cc.reconcile(shardMerger);
+                        if (remoteMerger == null)
+                            remoteMerger = cf.cloneMeShallow();
+                        remoteMerger.addColumn(c.name(), merged);
+                    }
+                    CounterColumn cleaned = merged.removeOldShards(gcBefore);
                     if (cleaned != subColumn)
                         c.replace(subColumn, cleaned);
                 }
             }
         }
+
+        if (remoteMerger != null && sendToOtherReplica)
+        {
+            try
+            {
+                sendToOtherReplica(key, remoteMerger);
+            }
+            catch (Exception e)
+            {
+                logger.error("Error while sending shard merger mutation to remote endpoints", e);
+            }
+        }
     }
 
     public IColumn markDeltaToBeCleared()
     {
         return new CounterColumn(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
     }
+
+    private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws UnavailableException, TimeoutException, IOException
+    {
+        RowMutation rm = new RowMutation(cf.metadata().ksName, key.key);
+        rm.add(cf);
+
+        final InetAddress local = FBUtilities.getBroadcastAddress();
+        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
+
+        StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
+        {
+            public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, TimeoutException
+            {
+                // We should only send to the remote replica, not the local one
+                targets.remove(local);
+                // Fake local response to be a good lad but we won't wait on the responseHandler
+                responseHandler.response(null);
+                StorageProxy.sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
+            }
+        });
+
+        // we don't wait for answers
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Mon Nov  7 13:36:28 2011
@@ -100,7 +100,6 @@ public class CounterMutation implements 
             if (row == null || row.cf == null)
                 continue;
 
-            row = mergeOldShards(readCommand.table, row);
             ColumnFamily cf = row.cf;
             if (cf.isSuper())
                 cf.retainAll(rowMutation.getColumnFamily(cf.metadata().cfId));
@@ -115,73 +114,6 @@ public class CounterMutation implements 
         commands.add(new SliceByNamesReadCommand(table, key, queryPath, columnFamily.getColumnNames()));
     }
 
-    private Row mergeOldShards(String table, Row row) throws IOException
-    {
-        ColumnFamily cf = row.cf;
-        // random check for merging to allow lessening the performance impact
-        if (cf.metadata().getMergeShardsChance() > FBUtilities.threadLocalRandom().nextDouble())
-        {
-            ColumnFamily merger = computeShardMerger(cf);
-            if (merger != null)
-            {
-                RowMutation localMutation = new RowMutation(table, row.key.key);
-                localMutation.add(merger);
-                localMutation.apply();
-
-                cf.addAll(merger, HeapAllocator.instance);
-            }
-        }
-        return row;
-    }
-
-    private ColumnFamily computeShardMerger(ColumnFamily cf)
-    {
-        ColumnFamily merger = null;
-
-        // CF type: regular
-        if (!cf.isSuper())
-        {
-            for (IColumn column : cf)
-            {
-                if (!(column instanceof CounterColumn))
-                    continue;
-                IColumn c = ((CounterColumn)column).computeOldShardMerger();
-                if (c != null)
-                {
-                    if (merger == null)
-                        merger = cf.cloneMeShallow();
-                    merger.addColumn(c);
-                }
-            }
-        }
-        else // CF type: super
-        {
-            for (IColumn superColumn : cf)
-            {
-                IColumn mergerSuper = null;
-                for (IColumn column : superColumn.getSubColumns())
-                {
-                    if (!(column instanceof CounterColumn))
-                        continue;
-                    IColumn c = ((CounterColumn)column).computeOldShardMerger();
-                    if (c != null)
-                    {
-                        if (mergerSuper == null)
-                            mergerSuper = ((SuperColumn)superColumn).cloneMeShallow();
-                        mergerSuper.addColumn(c);
-                    }
-                }
-                if (mergerSuper != null)
-                {
-                    if (merger == null)
-                        merger = cf.cloneMeShallow();
-                    merger.addColumn(mergerSuper);
-                }
-            }
-        }
-        return merger;
-    }
-
     public Message makeMutationMessage(int version) throws IOException
     {
         byte[] bytes = FBUtilities.serialize(this, serializer, version);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Nov  7 13:36:28 2011
@@ -76,12 +76,14 @@ public class Memtable
 
     private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily> columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
     public final ColumnFamilyStore cfs;
+    private final long creationTime;
 
     private SlabAllocator allocator = new SlabAllocator();
 
     public Memtable(ColumnFamilyStore cfs)
     {
         this.cfs = cfs;
+        this.creationTime = System.currentTimeMillis();
 
         Callable<Set<Object>> provider = new Callable<Set<Object>>()
         {
@@ -397,4 +399,9 @@ public class Memtable
     {
         columnFamilies.clear();
     }
+
+    public long creationTime()
+    {
+        return creationTime;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java Mon Nov  7 13:36:28 2011
@@ -45,6 +45,7 @@ public class CompactionController
 
     public final int gcBefore;
     public boolean keyExistenceIsExpensive;
+    public final int mergeShardBefore;
 
     public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
     {
@@ -52,6 +53,11 @@ public class CompactionController
         this.cfs = cfs;
         this.sstables = new HashSet<SSTableReader>(sstables);
         this.gcBefore = gcBefore;
+        // If we merge an old NodeId id, we must make sure that no further increment for that id are in an active memtable.
+        // For that, we must make sure that this id was renewed before the creation of the oldest unflushed memtable. We
+        // add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our
+        // current 'stop all write during memtable switch' situation).
+        this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
         this.forceDeserialize = forceDeserialize;
         keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(this.sstables);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java Mon Nov  7 13:36:28 2011
@@ -222,7 +222,7 @@ public class LazilyCompactedRow extends 
 
         protected IColumn getReduced()
         {
-            ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(shouldPurge, controller, container);
+            ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
             if (purged == null || !purged.iterator().hasNext())
             {
                 container.clear();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java Mon Nov  7 13:36:28 2011
@@ -83,17 +83,17 @@ public class PrecompactedRow extends Abs
             if (shouldPurge == null)
                 shouldPurge = controller.shouldPurge(key);
             if (shouldPurge)
-                CounterColumn.removeOldShards(compacted, controller.gcBefore);
+                CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
         }
 
         return compacted;
     }
 
-    public static ColumnFamily removeDeletedAndOldShards(boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
     {
         ColumnFamily compacted = shouldPurge ? ColumnFamilyStore.removeDeleted(cf, controller.gcBefore) : cf;
         if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
-            CounterColumn.removeOldShards(compacted, controller.gcBefore);
+            CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
         return compacted;
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java Mon Nov  7 13:36:28 2011
@@ -71,10 +71,6 @@ public class CounterContext implements I
 
     private static final Logger logger = Logger.getLogger(CounterContext.class);
 
-    // Time in ms since a node id has been renewed before we consider using it
-    // during a merge
-    private static final long MIN_MERGE_DELAY = 5 * 60 * 1000; // should be aplenty
-
     // lazy-load singleton
     private static class LazyHolder
     {
@@ -531,84 +527,123 @@ public class CounterContext implements I
 
     /**
      * Compute a new context such that if applied to context yields the same
-     * total but with the older local node id merged into the second to older one
-     * (excluding current local node id) if need be.
+     * total but with old local node ids nulified and there content merged to
+     * the current localNodeId.
      */
-    public ByteBuffer computeOldShardMerger(ByteBuffer context, List<NodeId.NodeIdRecord> oldIds)
+    public ByteBuffer computeOldShardMerger(ByteBuffer context, List<NodeId.NodeIdRecord> oldIds, long mergeBefore)
     {
         long now = System.currentTimeMillis();
         int hlength = headerLength(context);
-
-        // Don't bother if we know we can't find what we are looking for
-        if (oldIds.size() < 2
-         || now - oldIds.get(0).timestamp < MIN_MERGE_DELAY
-         || now - oldIds.get(1).timestamp < MIN_MERGE_DELAY
-         || context.remaining() - hlength < 2 * STEP_LENGTH)
-            return null;
+        NodeId localId = NodeId.getLocalId();
 
         Iterator<NodeId.NodeIdRecord> recordIterator = oldIds.iterator();
-        NodeId.NodeIdRecord currRecord = recordIterator.next();
+        NodeId.NodeIdRecord currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
 
         ContextState state = new ContextState(context, hlength);
         ContextState foundState = null;
 
+        List<NodeId> toMerge = new ArrayList<NodeId>();
+        long mergeTotal = 0;
         while (state.hasRemaining() && currRecord != null)
         {
-            if (now - currRecord.timestamp < MIN_MERGE_DELAY)
-                return context;
+            assert !currRecord.id.equals(localId);
 
-            assert !currRecord.id.equals(NodeId.getLocalId());
+            NodeId nodeId = state.getNodeId();
+            int c = nodeId.compareTo(currRecord.id);
 
-            int c = state.getNodeId().compareTo(currRecord.id);
-            if (c == 0)
+            if (c > 0)
+            {
+                currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
+                continue;
+            }
+
+            if (state.isDelta())
             {
-                if (foundState == null)
+                if (state.getClock() < 0)
                 {
-                    // We found a canditate for being merged
-                    if (state.getClock() < 0)
-                        return null;
-
-                    foundState = state.duplicate();
-                    currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
-                    state.moveToNext();
+                    // Already merged shard, waiting to be collected
+
+                    if (nodeId.equals(localId))
+                        // we should not get there, but we have been creating problematic context prior to #2968
+                        throw new RuntimeException("Current nodeId with a negative clock (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
+
+                    if (state.getCount() != 0)
+                    {
+                        // This should not happen, but previous bugs have generated this (#2968 in particular) so fixing it.
+                        logger.error(String.format("Invalid counter context (clock is %d and count is %d for NodeId %s), will fix", state.getCount(), state.getCount(), nodeId.toString()));
+                        toMerge.add(nodeId);
+                        mergeTotal += state.getCount();
+                    }
                 }
-                else
+                else if (c == 0)
                 {
-                    assert !foundState.getNodeId().equals(state.getNodeId());
-
-                    // Found someone to merge it to
-                    int nbDelta = foundState.isDelta() ? 1 : 0;
-                    nbDelta += state.isDelta() ? 1 : 0;
-                    ContextState merger = ContextState.allocate(2, nbDelta, HeapAllocator.instance);
-
-                    long fclock = foundState.getClock();
-                    long fcount = foundState.getCount();
-                    long clock = state.getClock();
-                    long count = state.getCount();
+                    // Found an old id. However, merging an oldId that has just been renewed isn't safe, so
+                    // we check that it has been renewed before mergeBefore.
+                    if (currRecord.timestamp < mergeBefore)
+                    {
+                        toMerge.add(nodeId);
+                        mergeTotal += state.getCount();
+                    }
+                }
+            }
 
-                    if (foundState.isDelta())
-                        merger.writeElement(foundState.getNodeId(), -now - fclock, -fcount, true);
-                    else
-                        merger.writeElement(foundState.getNodeId(), -now, 0);
+            if (c == 0)
+                currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
 
-                    if (state.isDelta())
-                        merger.writeElement(state.getNodeId(), fclock + clock, fcount, true);
-                    else
-                        merger.writeElement(state.getNodeId(), fclock + clock, fcount + count);
+            state.moveToNext();
+        }
+        // Continuing the iteration so that we can repair invalid shards
+        while (state.hasRemaining())
+        {
+            NodeId nodeId = state.getNodeId();
+            if (state.isDelta() && state.getClock() < 0)
+            {
+                if (nodeId.equals(localId))
+                    // we should not get there, but we have been creating problematic context prior to #2968
+                    throw new RuntimeException("Current nodeId with a negative clock (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
 
-                    return merger.context;
+                if (state.getCount() != 0)
+                {
+                    // This should not happen, but previous bugs have generated this (#2968 in particular) so fixing it.
+                    logger.error(String.format("Invalid counter context (clock is %d and count is %d for NodeId %s), will fix", state.getClock(), state.getCount(), nodeId.toString()));
+                    toMerge.add(nodeId);
+                    mergeTotal += state.getCount();
                 }
             }
-            else if (c < 0) // nodeid < record
+            state.moveToNext();
+        }
+
+        if (toMerge.isEmpty())
+            return null;
+
+        ContextState merger = ContextState.allocate(toMerge.size() + 1, toMerge.size() + 1);
+        state.reset();
+        int i = 0;
+        int removedTotal = 0;
+        boolean localWritten = false;
+        while (state.hasRemaining())
+        {
+            NodeId nodeId = state.getNodeId();
+            if (nodeId.compareTo(localId) > 0)
             {
-                state.moveToNext();
+                merger.writeElement(localId, 1L, mergeTotal, true);
+                localWritten = true;
             }
-            else // c > 0, nodeid > record
+            else if (i < toMerge.size() && nodeId.compareTo(toMerge.get(i)) == 0)
             {
-                currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
+                long count = state.getCount();
+                removedTotal += count;
+                merger.writeElement(nodeId, -now - state.getClock(), -count, true);
+                ++i;
             }
+            state.moveToNext();
         }
-        return null;
+        if (!localWritten)
+            merger.writeElement(localId, 1L, mergeTotal, true);
+
+        // sanity check
+        assert mergeTotal == removedTotal;
+        return merger.context;
     }
 
     /**
@@ -621,59 +656,61 @@ public class CounterContext implements I
     {
         int hlength = headerLength(context);
         ContextState state = new ContextState(context, hlength);
-        int removedBodySize = 0, removedHeaderSize = 0;
-        boolean forceFixing = false;
+        int removedShards = 0;
         while (state.hasRemaining())
         {
             long clock = state.getClock();
-            if (clock < 0 && -((int)(clock / 1000)) < gcBefore && (state.getCount() == 0 || !state.isDelta()))
-            {
-                removedBodySize += STEP_LENGTH;
-                if (state.isDelta())
-                    removedHeaderSize += HEADER_ELT_LENGTH;
-            }
-            else if (clock < 0 && state.getCount() != 0 && state.isDelta())
+            if (clock < 0)
             {
-                forceFixing = true;
+                // We should never have a count != 0 when clock < 0.
+                // We know that previous may have created those situation though, so:
+                //   * for delta shard: we throw an exception since computeOldShardMerger should
+                //     have corrected that situation
+                //   * for non-delta shard: it is a much more crappier situation because there is
+                //     not much we can do since we are not responsible for that shard. So we simply
+                //     ignore the shard.
+                if (state.getCount() != 0)
+                {
+                    if (state.isDelta())
+                    {
+                        throw new IllegalStateException("Counter shard with negative clock but count != 0; context = " + toString(context));
+                    }
+                    else
+                    {
+                        logger.debug("Ignoring non-removable non-delta corrupted shard in context " + toString(context));
+                        state.moveToNext();
+                        continue;
+                    }
+                }
+
+                if (-((int)(clock / 1000)) < gcBefore)
+                    removedShards++;
             }
             state.moveToNext();
         }
 
-        if (removedBodySize == 0 && !forceFixing)
+        if (removedShards == 0)
             return context;
 
-        int newSize = context.remaining() - removedHeaderSize - removedBodySize;
+
+        int removedHeaderSize = removedShards * HEADER_ELT_LENGTH;
+        int newSize = context.remaining() - removedHeaderSize - (removedShards * STEP_LENGTH);
         int newHlength = hlength - removedHeaderSize;
-        ByteBuffer cleanedContext = ByteBuffer.allocate(newSize);
+        ByteBuffer cleanedContext = HeapAllocator.instance.allocate(newSize);
         cleanedContext.putShort(cleanedContext.position(), (short) ((newHlength - HEADER_SIZE_LENGTH) / HEADER_ELT_LENGTH));
         ContextState cleaned = new ContextState(cleanedContext, newHlength);
 
         state.reset();
-        long toAddBack = 0;
         while (state.hasRemaining())
         {
             long clock = state.getClock();
-            if (!(clock < 0 && -((int)(clock / 1000)) < gcBefore && (state.getCount() == 0 || !state.isDelta())))
+            if (!(clock < 0 && state.getCount() == 0))
             {
-                if (clock < 0 && state.getCount() != 0 && state.isDelta())
-                {
-                    // we should not get there, but we have been creating problematic context prior to #2968
-                    if (state.getNodeId().equals(NodeId.getLocalId()))
-                        throw new RuntimeException("Merged counter shard with a count != 0 (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
-
-                    // we will "fix" it, but log a message
-                    logger.info("Collectable old shard with a count != 0. Will fix.");
-                    cleaned.writeElement(state.getNodeId(), clock - 1L, 0, true);
-                    toAddBack += state.getCount();
-                }
-                else
-                {
-                    state.copyTo(cleaned);
-                }
+                state.copyTo(cleaned);
             }
             state.moveToNext();
         }
-        return toAddBack == 0 ? cleanedContext : merge(cleanedContext, create(toAddBack, HeapAllocator.instance), HeapAllocator.instance);
+        return cleanedContext;
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Nov  7 13:36:28 2011
@@ -273,7 +273,7 @@ public class StorageProxy implements Sto
      *
      * @throws TimeoutException if the hints cannot be written/enqueued 
      */
-    private static void sendToHintedEndpoints(final RowMutation rm, 
+    public static void sendToHintedEndpoints(final RowMutation rm, 
                                               Collection<InetAddress> targets,
                                               IWriteResponseHandler responseHandler,
                                               String localDataCenter,
@@ -1226,7 +1226,7 @@ public class StorageProxy implements Sto
         return !Gossiper.instance.getUnreachableMembers().isEmpty();
     }
 
-    private interface WritePerformer
+    public interface WritePerformer
     {
         public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, TimeoutException;
     }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java Mon Nov  7 13:36:28 2011
@@ -19,6 +19,7 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 
 import org.junit.Test;
@@ -29,6 +30,7 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.Util;
 import static org.apache.cassandra.db.context.CounterContext.ContextState;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
@@ -40,13 +42,15 @@ public class CounterMutationTest extends
         RowMutation rm;
         CounterMutation cm;
 
+        NodeId id1 = NodeId.getLocalId();
+
         rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
         rm.addCounter(new QueryPath("Counter1", null, ByteBufferUtil.bytes("Column1")), 3);
         cm = new CounterMutation(rm, ConsistencyLevel.ONE);
         cm.apply();
 
         NodeId.renewLocalId(2L); // faking time of renewal for test
-        NodeId id1 = NodeId.getLocalId();
+        NodeId id2 = NodeId.getLocalId();
 
         rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
         rm.addCounter(new QueryPath("Counter1", null, ByteBufferUtil.bytes("Column1")), 4);
@@ -54,7 +58,7 @@ public class CounterMutationTest extends
         cm.apply();
 
         NodeId.renewLocalId(4L); // faking time of renewal for test
-        NodeId id2 = NodeId.getLocalId();
+        NodeId id3 = NodeId.getLocalId();
 
         rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
         rm.addCounter(new QueryPath("Counter1", null, ByteBufferUtil.bytes("Column1")), 5);
@@ -62,20 +66,39 @@ public class CounterMutationTest extends
         cm = new CounterMutation(rm, ConsistencyLevel.ONE);
         cm.apply();
 
-        RowMutation reprm = cm.makeReplicationMutation();
-        ColumnFamily cf = reprm.getColumnFamilies().iterator().next();
-        CounterColumn.removeOldShards(cf, Integer.MAX_VALUE);
+        DecoratedKey dk = Util.dk("key1");
+        ColumnFamily cf = Util.getColumnFamily(Table.open("Keyspace1"), dk, "Counter1");
+
+        // First merges old shards
+        CounterColumn.mergeAndRemoveOldShards(dk, cf, Integer.MIN_VALUE, Integer.MAX_VALUE, false);
+        long now = System.currentTimeMillis();
         IColumn c = cf.getColumn(ByteBufferUtil.bytes("Column1"));
         assert c != null;
         assert c instanceof CounterColumn;
-
         assert ((CounterColumn)c).total() == 12L;
         ContextState s = new ContextState(c.value());
         assert s.getNodeId().equals(id1);
-        assert s.getCount() == 7;
+        assert s.getCount() == 0L;
+        assert -s.getClock() > now - 1000 : " >";
+        assert -s.getClock() <= now;
         s.moveToNext();
         assert s.getNodeId().equals(id2);
-        assert s.getCount() == 5;
+        assert s.getCount() == 0L;
+        assert -s.getClock() > now - 1000;
+        assert -s.getClock() <= now;
+        s.moveToNext();
+        assert s.getNodeId().equals(id3);
+        assert s.getCount() == 12L;
+
+        // Then collect old shards
+        CounterColumn.mergeAndRemoveOldShards(dk, cf, Integer.MAX_VALUE, Integer.MIN_VALUE, false);
+        c = cf.getColumn(ByteBufferUtil.bytes("Column1"));
+        assert c != null;
+        assert c instanceof CounterColumn;
+        assert ((CounterColumn)c).total() == 12L;
+        s = new ContextState(c.value());
+        assert s.getNodeId().equals(id3);
+        assert s.getCount() == 12L;
     }
 
     @Test
@@ -131,6 +154,7 @@ public class CounterMutationTest extends
     public void testRemoveOldShardFixCorrupted() throws IOException
     {
         CounterContext ctx = CounterContext.instance();
+        int now = (int) (System.currentTimeMillis() / 1000);
 
         // Check that corrupted context created prior to #2968 are fixed by removeOldShards
         NodeId id1 = NodeId.getLocalId();
@@ -140,19 +164,21 @@ public class CounterMutationTest extends
         ContextState state = ContextState.allocate(3, 2);
         state.writeElement(NodeId.fromInt(1), 1, 4, false);
         state.writeElement(id1, 3, 2, true);
-        state.writeElement(id2, -System.currentTimeMillis(), 5, true); // corrupted!
+        state.writeElement(id2, -100, 5, true); // corrupted!
 
         assert ctx.total(state.context) == 11;
 
         try
         {
-            ctx.removeOldShards(state.context, Integer.MAX_VALUE);
+            ByteBuffer merger = ctx.computeOldShardMerger(state.context, Collections.<NodeId.NodeIdRecord>emptyList(), 0);
+            ctx.removeOldShards(ctx.merge(state.context, merger, HeapAllocator.instance), now);
             fail("RemoveOldShards should throw an exception if the current id is non-sensical");
         }
         catch (RuntimeException e) {}
 
         NodeId.renewLocalId();
-        ByteBuffer cleaned = ctx.removeOldShards(state.context, Integer.MAX_VALUE);
+        ByteBuffer merger = ctx.computeOldShardMerger(state.context, Collections.<NodeId.NodeIdRecord>emptyList(), 0);
+        ByteBuffer cleaned = ctx.removeOldShards(ctx.merge(state.context, merger, HeapAllocator.instance), now);
         assert ctx.total(cleaned) == 11;
 
         // Check it is not corrupted anymore

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java Mon Nov  7 13:36:28 2011
@@ -362,54 +362,31 @@ public class CounterContextTest
         records.add(new NodeId.NodeIdRecord(id1, 2L));
         records.add(new NodeId.NodeIdRecord(id3, 4L));
 
-        // Destination of merge is a delta
-        ContextState ctx = ContextState.allocate(5, 2, allocator);
-        ctx.writeElement(id1, 1L, 1L);
+        ContextState ctx = ContextState.allocate(5, 3, allocator);
+        ctx.writeElement(id1, 1L, 1L, true);
         ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
         ctx.writeElement(id3, 3L, 3L, true);
         ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
         ctx.writeElement(NodeId.fromInt(5), 7L, 3L, true);
 
-        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records);
+        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, Integer.MAX_VALUE);
+
         ContextState m = new ContextState(merger);
 
         assert m.getNodeId().equals(id1);
         assert m.getClock() <= -now;
-        assert m.getCount() == 0;
+        assert m.getCount() == -1L;
+        assert m.isDelta();
         m.moveToNext();
         assert m.getNodeId().equals(id3);
-        assert m.getClock() == 4L;
-        assert m.getCount() == 1L;
-        assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger, allocator));
-
-        // Source of merge is a delta
-        ctx = ContextState.allocate(4, 1, allocator);
-        ctx.writeElement(id1, 1L, 1L, true);
-        ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
-        ctx.writeElement(id3, 3L, 3L);
-        ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
-        merger = cc.computeOldShardMerger(ctx.context, records);
-        assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger, allocator));
-
-        // source and destination of merge are deltas
-        ctx = ContextState.allocate(4, 2, allocator);
-        ctx.writeElement(id1, 1L, 1L, true);
-        ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
-        ctx.writeElement(id3, 3L, 3L, true);
-        ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
-        merger = cc.computeOldShardMerger(ctx.context, records);
-        assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger, allocator));
-
-        // none of source and destination of merge are deltas
-        ctx = ContextState.allocate(4, 0, allocator);
-        ctx.writeElement(id1, 1L, 1L);
-        ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
-        ctx.writeElement(id3, 3L, 3L);
-        ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
-        merger = cc.computeOldShardMerger(ctx.context, records);
+        assert m.getClock() <= -now;
+        assert m.getCount() == -3L;
+        assert m.isDelta();
+        m.moveToNext();
+        assert m.getNodeId().equals(NodeId.getLocalId());
+        assert m.getClock() == 1L;
+        assert m.getCount() == 4L;
+        assert m.isDelta();
         assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger, allocator));
     }
 
@@ -430,28 +407,20 @@ public class CounterContextTest
         records.add(new NodeId.NodeIdRecord(id3, 4L));
         records.add(new NodeId.NodeIdRecord(id6, 10L));
 
-        ContextState ctx = ContextState.allocate(6, 2, allocator);
-        ctx.writeElement(id1, 1L, 1L);
+        ContextState ctx = ContextState.allocate(6, 3, allocator);
+        ctx.writeElement(id1, 1L, 1L, true);
         ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
         ctx.writeElement(id3, 3L, 3L, true);
         ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
         ctx.writeElement(NodeId.fromInt(5), 7L, 3L, true);
         ctx.writeElement(id6, 5L, 6L);
 
-        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records);
+        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, Integer.MAX_VALUE);
         ByteBuffer merged = cc.merge(ctx.context, merger, allocator);
         assert cc.total(ctx.context) == cc.total(merged);
 
         ByteBuffer cleaned = cc.removeOldShards(merged, (int)(System.currentTimeMillis() / 1000) + 1);
         assert cc.total(ctx.context) == cc.total(cleaned);
-        assert cleaned.remaining() == ctx.context.remaining() - stepLength;
-
-        merger = cc.computeOldShardMerger(cleaned, records);
-        merged = cc.merge(cleaned, merger, allocator);
-        assert cc.total(ctx.context) == cc.total(merged);
-
-        cleaned = cc.removeOldShards(merged, (int)(System.currentTimeMillis() / 1000) + 1);
-        assert cc.total(ctx.context) == cc.total(cleaned);
-        assert cleaned.remaining() == ctx.context.remaining() - 2 * stepLength - 2;
+        assert cleaned.remaining() == ctx.context.remaining() - stepLength - 2;
     }
 }