You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/16 20:08:03 UTC

[2/2] git commit: Add support for 2.1 global counter shards

Add support for 2.1 global counter shards

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6505


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/83cd80b2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/83cd80b2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/83cd80b2

Branch: refs/heads/cassandra-2.0
Commit: 83cd80b277fa6e73be1c7d66b80669731f0f92c0
Parents: 1d4caf4
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Jan 16 22:07:20 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Jan 16 22:07:20 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/CounterColumn.java  | 125 +--
 .../cassandra/db/CounterUpdateColumn.java       |   4 +-
 .../db/compaction/CompactionController.java     |   6 -
 .../db/compaction/LazilyCompactedRow.java       |   6 +-
 .../compaction/ParallelCompactionIterable.java  |   2 +-
 .../db/compaction/PrecompactedRow.java          |  32 +-
 .../cassandra/db/context/CounterContext.java    | 765 +++++++++----------
 .../io/sstable/AbstractSSTableSimpleWriter.java |   5 +-
 .../cassandra/io/sstable/SSTableWriter.java     |   2 +-
 .../apache/cassandra/db/CounterColumnTest.java  | 105 +--
 .../cassandra/db/CounterMutationTest.java       | 115 ---
 .../db/context/CounterContextTest.java          | 703 ++++++++---------
 .../streaming/StreamingTransferTest.java        |  19 +-
 14 files changed, 789 insertions(+), 1101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac8c428..d994612 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@
  * Improve replica pinning for cache efficiency in DES (CASSANDRA-6485)
  * Fix LOCAL_SERIAL from thrift (CASSANDRA-6584)
  * Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595)
+ * Add support for 2.1 global counter shards (CASSANDRA-6505)
 Merged from 1.2:
  * fsync compression metadata (CASSANDRA-6531)
  * Validate CF existence on execution for prepared statement (CASSANDRA-6535)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index b470c5a..28a2ba5 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -19,27 +19,18 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.net.InetAddress;
 import java.security.MessageDigest;
-import java.util.Set;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import org.apache.cassandra.serializers.MarshalException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.context.IContext.ContextRelationship;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.service.AbstractWriteResponseHandler;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.*;
 
 /**
@@ -55,12 +46,12 @@ public class CounterColumn extends Column
 
     public CounterColumn(ByteBuffer name, long value, long timestamp)
     {
-        this(name, contextManager.create(value, HeapAllocator.instance), timestamp);
+        this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp);
     }
 
     public CounterColumn(ByteBuffer name, long value, long timestamp, long timestampOfLastDelete)
     {
-        this(name, contextManager.create(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
+        this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
     }
 
     public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp)
@@ -76,10 +67,8 @@ public class CounterColumn extends Column
 
     public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
     {
-        // #elt being negative means we have to clean delta
-        short count = value.getShort(value.position());
-        if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && count < 0))
-            value = CounterContext.instance().clearAllDelta(value);
+        if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && contextManager.shouldClearLocal(value)))
+            value = contextManager.clearAllLocal(value);
         return new CounterColumn(name, value, timestamp, timestampOfLastDelete);
     }
 
@@ -265,107 +254,9 @@ public class CounterColumn extends Column
         return contextManager.hasCounterId(value(), id);
     }
 
-    private CounterColumn computeOldShardMerger(int mergeBefore)
-    {
-        ByteBuffer bb = contextManager.computeOldShardMerger(value(), CounterId.getOldLocalCounterIds(), mergeBefore);
-        if (bb == null)
-            return null;
-        else
-            return new CounterColumn(name(), bb, timestamp(), timestampOfLastDelete);
-    }
-
-    private CounterColumn removeOldShards(int gcBefore)
-    {
-        ByteBuffer bb = contextManager.removeOldShards(value(), gcBefore);
-        if (bb == value())
-            return this;
-        else
-        {
-            return new CounterColumn(name(), bb, timestamp(), timestampOfLastDelete);
-        }
-    }
-
-    public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore)
-    {
-        mergeAndRemoveOldShards(key, cf, gcBefore, mergeBefore, true);
-    }
-
-    /**
-     * There is two phase to the removal of old shards.
-     * First phase: we merge the old shard value to the current shard and
-     * 'nulify' the old one. We then send the counter context with the old
-     * shard nulified to all other replica.
-     * Second phase: once an old shard has been nulified for longer than
-     * gc_grace (to be sure all other replica had been aware of the merge), we
-     * simply remove that old shard from the context (it's value is 0).
-     * This method does both phases.
-     * (Note that the sendToOtherReplica flag is here only to facilitate
-     * testing. It should be true in real code so use the method above
-     * preferably)
-     */
-    public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore, boolean sendToOtherReplica)
-    {
-        ColumnFamily remoteMerger = null;
-
-        for (Column c : cf)
-        {
-            if (!(c instanceof CounterColumn))
-                continue;
-            CounterColumn cc = (CounterColumn) c;
-            CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
-            CounterColumn merged = cc;
-            if (shardMerger != null)
-            {
-                merged = (CounterColumn) cc.reconcile(shardMerger);
-                if (remoteMerger == null)
-                    remoteMerger = cf.cloneMeShallow();
-                remoteMerger.addColumn(merged);
-            }
-            CounterColumn cleaned = merged.removeOldShards(gcBefore);
-            if (cleaned != cc)
-            {
-                cf.replace(cc, cleaned);
-            }
-        }
-
-        if (remoteMerger != null && sendToOtherReplica)
-        {
-            try
-            {
-                sendToOtherReplica(key, remoteMerger);
-            }
-            catch (Exception e)
-            {
-                logger.error("Error while sending shard merger mutation to remote endpoints", e);
-            }
-        }
-    }
-
-    public Column markDeltaToBeCleared()
+    public Column markLocalToBeCleared()
     {
-        return new CounterColumn(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
-    }
-
-    private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException
-    {
-        RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf);
-
-        final InetAddress local = FBUtilities.getBroadcastAddress();
-        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
-
-        StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
-        {
-            public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
-            throws OverloadedException
-            {
-                // We should only send to the remote replica, not the local one
-                Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local));
-                // Fake local response to be a good lad but we won't wait on the responseHandler
-                responseHandler.response(null);
-                StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler, localDataCenter);
-            }
-        }, null, WriteType.SIMPLE);
-
-        // we don't wait for answers
+        ByteBuffer marked = contextManager.markLocalToBeCleared(value);
+        return marked == value ? this : new CounterColumn(name, marked, timestamp, timestampOfLastDelete);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
index 1ae7dd7..422beee 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
@@ -82,7 +82,7 @@ public class CounterUpdateColumn extends Column
     public CounterColumn localCopy(ColumnFamilyStore cfs)
     {
         return new CounterColumn(cfs.internOrCopy(name, HeapAllocator.instance),
-                                 CounterContext.instance().create(delta(), HeapAllocator.instance),
+                                 CounterContext.instance().createLocal(delta(), HeapAllocator.instance),
                                  timestamp(),
                                  Long.MIN_VALUE);
     }
@@ -91,7 +91,7 @@ public class CounterUpdateColumn extends Column
     public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new CounterColumn(cfs.internOrCopy(name, allocator),
-                                 CounterContext.instance().create(delta(), allocator),
+                                 CounterContext.instance().createLocal(delta(), allocator),
                                  timestamp(),
                                  Long.MIN_VALUE);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 7edc60e..fba659d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -49,7 +49,6 @@ public class CompactionController
     private final Set<SSTableReader> compacting;
 
     public final int gcBefore;
-    public final int mergeShardBefore;
 
     /**
      * Constructor that subclasses may use when overriding shouldPurge to not need overlappingTree
@@ -65,11 +64,6 @@ public class CompactionController
         this.cfs = cfs;
         this.gcBefore = gcBefore;
         this.compacting = compacting;
-        // If we merge an old CounterId id, we must make sure that no further increment for that id are in an active memtable.
-        // For that, we must make sure that this id was renewed before the creation of the oldest unflushed memtable. We
-        // add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our
-        // current 'stop all write during memtable switch' situation).
-        this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
         Set<SSTableReader> overlapping = compacting == null ? null : cfs.getAndReferenceOverlappingSSTables(compacting);
         this.overlappingSSTables = overlapping == null ? Collections.<SSTableReader>emptySet() : overlapping;
         this.overlappingTree = overlapping == null ? null : DataTracker.buildIntervalTree(overlapping);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 0ad3de2..998f8cc 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -259,7 +259,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             {
                 // when we clear() the container, it removes the deletion info, so this needs to be reset each time
                 container.delete(maxRowTombstone);
-                ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
+                ColumnFamily purged = PrecompactedRow.removeDeleted(key, shouldPurge, controller, container);
                 if (purged == null || !purged.iterator().hasNext())
                 {
                     container.clear();
@@ -268,8 +268,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 Column reduced = purged.iterator().next();
                 container.clear();
 
-                // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
-                // not the range tombstone. For that we use the columnIndexer tombstone tracker.
+                // PrecompactedRow.removeDeleted has only checked the top-level CF deletion times,
+                // not the range tombstones. For that we use the columnIndexer tombstone tracker.
                 if (indexBuilder.tombstoneTracker().isDeleted(reduced))
                 {
                     indexer.remove(reduced);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index fdddc4b..f1790d8 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -187,7 +187,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                 }
 
                 PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key));
-                return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF);
+                return PrecompactedRow.removeDeleted(rows.get(0).key, controller, returnCF);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index b225f53..d45bffa 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -44,14 +44,14 @@ public class PrecompactedRow extends AbstractCompactedRow
 {
     private final ColumnFamily compactedCf;
 
-    /** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
+    // it is caller's responsibility to call removeDeleted from the cf before calling this constructor
     public PrecompactedRow(DecoratedKey key, ColumnFamily cf)
     {
         super(key);
         compactedCf = cf;
     }
 
-    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf)
+    public static ColumnFamily removeDeleted(DecoratedKey key, CompactionController controller, ColumnFamily cf)
     {
         assert key != null;
         assert controller != null;
@@ -59,7 +59,7 @@ public class PrecompactedRow extends AbstractCompactedRow
 
         // avoid calling shouldPurge unless we actually need to: it can be very expensive if LCS
         // gets behind and has hundreds of overlapping L0 sstables.  Essentially, this method is an
-        // ugly refactor of removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf),
+        // ugly refactor of removeDeleted(controller.shouldPurge(key), controller, cf),
         // taking this into account.
         Boolean shouldPurge = null;
 
@@ -69,34 +69,20 @@ public class PrecompactedRow extends AbstractCompactedRow
         // We should only gc tombstone if shouldPurge == true. But otherwise,
         // it is still ok to collect column that shadowed by their (deleted)
         // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
-        ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, shouldPurge != null && shouldPurge ? controller.gcBefore : Integer.MIN_VALUE);
-
-        if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
-        {
-            if (shouldPurge == null)
-                shouldPurge = controller.shouldPurge(key, cf.deletionInfo().maxTimestamp());
-            if (shouldPurge)
-                CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
-        }
-
-        return compacted;
+        return ColumnFamilyStore.removeDeleted(cf, shouldPurge != null && shouldPurge ? controller.gcBefore : Integer.MIN_VALUE);
     }
 
-    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+    public static ColumnFamily removeDeleted(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
     {
         // See comment in preceding method
-        ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
-                                                                 shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
-                                                                 controller.cfs.indexManager.updaterFor(key));
-        if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
-            CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
-        return compacted;
+        return ColumnFamilyStore.removeDeleted(cf,
+                                               shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
+                                               controller.cfs.indexManager.updaterFor(key));
     }
 
     public PrecompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
     {
-        this(rows.get(0).getKey(),
-             removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows, controller)));
+        this(rows.get(0).getKey(), removeDeleted(rows.get(0).getKey(), controller, merge(rows, controller)));
     }
 
     private static ColumnFamily merge(List<SSTableIdentityIterator> rows, CompactionController controller)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/context/CounterContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index 634ee22..1fa4d60 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -20,27 +20,27 @@ package org.apache.cassandra.db.context;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.cassandra.serializers.MarshalException;
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.*;
 
 /**
  * An implementation of a partitioned counter context.
  *
  * A context is primarily a list of tuples (counter id, clock, count) -- called
- * shard in the following. But with some shard are flagged as delta (with
+ * shards, with some shards flagged as global or local (with
  * special resolution rules in merge()).
  *
  * The data structure has two parts:
- *   a) a header containing the lists of "delta" (a list of references to the second parts)
- *   b) a list of shard -- (counter id, logical clock, count) tuples -- (the so-called 'body' below)
+ *   a) a header containing the lists of global and local shard indexes in the body
+ *   b) a list of shards -- (counter id, logical clock, count) tuples -- (the so-called 'body' below)
  *
  * The exact layout is:
  *            | header  |   body   |
@@ -49,6 +49,9 @@ import org.apache.cassandra.utils.*;
  *             |   list of indices in the body list (2*#elt bytes)
  *    #elt in rest of header (2 bytes)
  *
+ * Non-negative indices refer to local shards. Global shard indices are encoded as [idx + Short.MIN_VALUE],
+ * and are thus always negative.
+ *
  * The body layout being:
  *
  * body:     |----|----|----|----|----|----|....
@@ -57,14 +60,17 @@ import org.apache.cassandra.utils.*;
  *             |  clock_1      |  clock_2
  *       counterid_1         counterid_2
  *
- * The rules when merging two shard with the same counterid are:
- *   - delta + delta = sum counts (and logical clock)
- *   - delta + other = keep the delta one
- *   - other + other = keep the shard with highest logical clock
+ * The rules when merging two shard with the same counter id are:
+ *   - global + global = keep the shard with the highest logical clock
+ *   - global + local  = keep the global one
+ *   - global + remote = keep the global one
+ *   - local  + local  = sum counts (and logical clocks)
+ *   - local  + remote = keep the local one
+ *   - remote + remote = keep the shard with the highest logical clock
  *
- * For a detailed description of the meaning of a delta and why the merging
+ * For a detailed description of the meaning of a local and why the merging
  * rules work this way, see CASSANDRA-1938 - specifically the 1938_discussion
- * attachment.
+ * attachment (doesn't cover global shards, see CASSANDRA-4775 for that).
  */
 public class CounterContext implements IContext
 {
@@ -88,45 +94,23 @@ public class CounterContext implements IContext
     }
 
     /**
-     * Creates an initial counter context with an initial value for the local node.
-     *
-     *
-     * @param value the value for this initial update
-     *
-     * @param allocator
-     * @return an empty counter context.
+     * Creates a counter context with a single local shard.
      */
-    public ByteBuffer create(long value, Allocator allocator)
+    public ByteBuffer createLocal(long count, Allocator allocator)
     {
-        ByteBuffer context = allocator.allocate(HEADER_SIZE_LENGTH + HEADER_ELT_LENGTH + STEP_LENGTH);
-        // The first (and only) elt is a delta
-        context.putShort(context.position(), (short)1);
-        context.putShort(context.position() + HEADER_SIZE_LENGTH, (short)0);
-        writeElementAtOffset(context, context.position() + HEADER_SIZE_LENGTH + HEADER_ELT_LENGTH, CounterId.getLocalId(), 1L, value);
-        return context;
+        ContextState state = ContextState.allocate(0, 1, 0, allocator);
+        state.writeLocal(CounterId.getLocalId(), 1L, count);
+        return state.context;
     }
 
-    // Provided for use by unit tests
-    public ByteBuffer create(CounterId id, long clock, long value, boolean isDelta)
-    {
-        ByteBuffer context = ByteBuffer.allocate(HEADER_SIZE_LENGTH + (isDelta ? HEADER_ELT_LENGTH : 0) + STEP_LENGTH);
-        context.putShort(context.position(), (short)(isDelta ? 1 : 0));
-        if (isDelta)
-        {
-            context.putShort(context.position() + HEADER_SIZE_LENGTH, (short)0);
-        }
-        writeElementAtOffset(context, context.position() + HEADER_SIZE_LENGTH + (isDelta ? HEADER_ELT_LENGTH : 0), id, clock, value);
-        return context;
-    }
-
-    // write a tuple (counter id, clock, count) at an absolute (bytebuffer-wise) offset
-    private static void writeElementAtOffset(ByteBuffer context, int offset, CounterId id, long clock, long count)
+    /**
+     * Creates a counter context with a single remote shard.
+     */
+    public ByteBuffer createRemote(CounterId id, long clock, long count, Allocator allocator)
     {
-        context = context.duplicate();
-        context.position(offset);
-        context.put(id.bytes().duplicate());
-        context.putLong(clock);
-        context.putLong(count);
+        ContextState state = ContextState.allocate(0, 0, 1, allocator);
+        state.writeRemote(id, clock, count);
+        return state.context;
     }
 
     private static int headerLength(ByteBuffer context)
@@ -156,8 +140,8 @@ public class CounterContext implements IContext
     public ContextRelationship diff(ByteBuffer left, ByteBuffer right)
     {
         ContextRelationship relationship = ContextRelationship.EQUAL;
-        ContextState leftState = new ContextState(left, headerLength(left));
-        ContextState rightState = new ContextState(right, headerLength(right));
+        ContextState leftState = ContextState.wrap(left);
+        ContextState rightState = ContextState.wrap(right);
 
         while (leftState.hasRemaining() && rightState.hasRemaining())
         {
@@ -298,107 +282,90 @@ public class CounterContext implements IContext
      */
     public ByteBuffer merge(ByteBuffer left, ByteBuffer right, Allocator allocator)
     {
-        ContextState leftState = new ContextState(left, headerLength(left));
-        ContextState rightState = new ContextState(right, headerLength(right));
+        int globalCount = 0;
+        int localCount = 0;
+        int remoteCount = 0;
 
-        // Compute size of result
-        int mergedHeaderLength = HEADER_SIZE_LENGTH;
-        int mergedBodyLength = 0;
+        ContextState leftState = ContextState.wrap(left);
+        ContextState rightState = ContextState.wrap(right);
 
         while (leftState.hasRemaining() && rightState.hasRemaining())
         {
             int cmp = leftState.compareIdTo(rightState);
             if (cmp == 0)
             {
-                mergedBodyLength += STEP_LENGTH;
-                if (leftState.isDelta() || rightState.isDelta())
-                    mergedHeaderLength += HEADER_ELT_LENGTH;
+                if (leftState.isGlobal() || rightState.isGlobal())
+                    globalCount += 1;
+                else if (leftState.isLocal() || rightState.isLocal())
+                    localCount += 1;
+                else
+                    remoteCount += 1;
+
                 leftState.moveToNext();
                 rightState.moveToNext();
             }
             else if (cmp > 0)
             {
-                mergedBodyLength += STEP_LENGTH;
-                if (rightState.isDelta())
-                    mergedHeaderLength += HEADER_ELT_LENGTH;
+                if (rightState.isGlobal())
+                    globalCount += 1;
+                else if (rightState.isLocal())
+                    localCount += 1;
+                else
+                    remoteCount += 1;
+
                 rightState.moveToNext();
             }
             else // cmp < 0
             {
-                mergedBodyLength += STEP_LENGTH;
-                if (leftState.isDelta())
-                    mergedHeaderLength += HEADER_ELT_LENGTH;
+                if (leftState.isGlobal())
+                    globalCount += 1;
+                else if (leftState.isLocal())
+                    localCount += 1;
+                else
+                    remoteCount += 1;
+
                 leftState.moveToNext();
             }
         }
-        mergedHeaderLength += leftState.remainingHeaderLength() + rightState.remainingHeaderLength();
-        mergedBodyLength += leftState.remainingBodyLength() + rightState.remainingBodyLength();
 
-        // Do the actual merge
-        ByteBuffer merged = allocator.allocate(mergedHeaderLength + mergedBodyLength);
-        merged.putShort(merged.position(), (short) ((mergedHeaderLength - HEADER_SIZE_LENGTH) / HEADER_ELT_LENGTH));
-        ContextState mergedState = new ContextState(merged, mergedHeaderLength);
+        while (leftState.hasRemaining())
+        {
+            if (leftState.isGlobal())
+                globalCount += 1;
+            else if (leftState.isLocal())
+                localCount += 1;
+            else
+                remoteCount += 1;
+
+            leftState.moveToNext();
+        }
+
+        while (rightState.hasRemaining())
+        {
+            if (rightState.isGlobal())
+                globalCount += 1;
+            else if (rightState.isLocal())
+                localCount += 1;
+            else
+                remoteCount += 1;
+
+            rightState.moveToNext();
+        }
+
         leftState.reset();
         rightState.reset();
+
+        return merge(ContextState.allocate(globalCount, localCount, remoteCount, allocator), leftState, rightState);
+    }
+
+    private ByteBuffer merge(ContextState mergedState, ContextState leftState, ContextState rightState)
+    {
         while (leftState.hasRemaining() && rightState.hasRemaining())
         {
             int cmp = leftState.compareIdTo(rightState);
             if (cmp == 0)
             {
-                if (leftState.isDelta() || rightState.isDelta())
-                {
-                    // Local id and at least one is a delta
-                    if (leftState.isDelta() && rightState.isDelta())
-                    {
-                        // both delta, sum
-                        long clock = leftState.getClock() + rightState.getClock();
-                        long count = leftState.getCount() + rightState.getCount();
-                        mergedState.writeElement(leftState.getCounterId(), clock, count, true);
-                    }
-                    else
-                    {
-                        // Only one have delta, keep that one
-                        (leftState.isDelta() ? leftState : rightState).copyTo(mergedState);
-                    }
-                }
-                else
-                {
-                    long leftClock = leftState.getClock();
-                    long rightClock = rightState.getClock();
-
-                    if (leftClock == rightClock)
-                    {
-                        // We should never see non-delta shards w/ same id+clock but different counts. However, if we do
-                        // we should "heal" the problem by being deterministic in our selection of shard - and
-                        // log the occurrence so that the operator will know something is wrong.
-                        long leftCount = leftState.getCount();
-                        long rightCount = rightState.getCount();
-
-                        if (leftCount != rightCount && CompactionManager.isCompactionManager.get())
-                        {
-                            logger.warn("invalid counter shard detected; ({}, {}, {}) and ({}, {}, {}) differ only in "
-                                        + "count; will pick highest to self-heal on compaction",
-                                        leftState.getCounterId(), leftClock, leftCount, rightState.getCounterId(), rightClock, rightCount);
-                        }
-
-                        if (leftCount > rightCount)
-                        {
-                            leftState.copyTo(mergedState);
-                        }
-                        else
-                        {
-                            rightState.copyTo(mergedState);
-                        }
-                    }
-                    else
-                    {
-                        if ((leftClock >= 0 && rightClock > 0 && leftClock >= rightClock)
-                                || (leftClock < 0 && (rightClock > 0 || leftClock < rightClock)))
-                            leftState.copyTo(mergedState);
-                        else
-                            rightState.copyTo(mergedState);
-                    }
-                }
+                mergeTie(mergedState, leftState, rightState);
                 rightState.moveToNext();
                 leftState.moveToNext();
             }
@@ -413,18 +380,110 @@ public class CounterContext implements IContext
                 leftState.moveToNext();
             }
         }
+
         while (leftState.hasRemaining())
         {
             leftState.copyTo(mergedState);
             leftState.moveToNext();
         }
+
         while (rightState.hasRemaining())
         {
             rightState.copyTo(mergedState);
             rightState.moveToNext();
         }
 
-        return merged;
+        return mergedState.context;
+    }
+
+    private void mergeTie(ContextState mergedState, ContextState leftState, ContextState rightState)
+    {
+        if (leftState.isGlobal() || rightState.isGlobal())
+        {
+            if (leftState.isGlobal() && rightState.isGlobal())
+            {
+                long leftClock = leftState.getClock();
+                long rightClock = rightState.getClock();
+
+                if (leftClock == rightClock)
+                {
+                    long leftCount = leftState.getCount();
+                    long rightCount = rightState.getCount();
+
+                    // Can happen if an sstable gets lost and disk failure policy is set to 'best effort'
+                    if (leftCount != rightCount && CompactionManager.isCompactionManager.get())
+                    {
+                        logger.warn("invalid global counter shard detected; ({}, {}, {}) and ({}, {}, {}) differ only in "
+                                    + "count; will pick highest to self-heal on compaction",
+                                    leftState.getCounterId(), leftClock, leftCount,
+                                    rightState.getCounterId(), rightClock, rightCount);
+                    }
+
+                    if (leftCount > rightCount)
+                        leftState.copyTo(mergedState);
+                    else
+                        rightState.copyTo(mergedState);
+                }
+                else
+                {
+                    (leftClock > rightClock ? leftState : rightState).copyTo(mergedState);
+                }
+            }
+            else // only one is global - keep that one
+            {
+                (leftState.isGlobal() ? leftState : rightState).copyTo(mergedState);
+            }
+        }
+        else if (leftState.isLocal() || rightState.isLocal())
+        {
+            // Local id and at least one is a local shard.
+            if (leftState.isLocal() && rightState.isLocal())
+            {
+                // both local - sum
+                long clock = leftState.getClock() + rightState.getClock();
+                long count = leftState.getCount() + rightState.getCount();
+                mergedState.writeLocal(leftState.getCounterId(), clock, count);
+            }
+            else // only one is local - keep that one
+            {
+                (leftState.isLocal() ? leftState : rightState).copyTo(mergedState);
+            }
+        }
+        else // both are remote shards
+        {
+            long leftClock = leftState.getClock();
+            long rightClock = rightState.getClock();
+
+            if (leftClock == rightClock)
+            {
+                // We should never see non-local shards w/ same id+clock but different counts. However, if we do
+                // we should "heal" the problem by being deterministic in our selection of shard - and
+                // log the occurrence so that the operator will know something is wrong.
+                long leftCount = leftState.getCount();
+                long rightCount = rightState.getCount();
+
+                if (leftCount != rightCount && CompactionManager.isCompactionManager.get())
+                {
+                    logger.warn("invalid remote counter shard detected; ({}, {}, {}) and ({}, {}, {}) differ only in "
+                                + "count; will pick highest to self-heal on compaction",
+                                leftState.getCounterId(), leftClock, leftCount,
+                                rightState.getCounterId(), rightClock, rightCount);
+                }
+
+                if (leftCount > rightCount)
+                    leftState.copyTo(mergedState);
+                else
+                    rightState.copyTo(mergedState);
+            }
+            else
+            {
+                if ((leftClock >= 0 && rightClock > 0 && leftClock >= rightClock)
+                        || (leftClock < 0 && (rightClock > 0 || leftClock < rightClock)))
+                    leftState.copyTo(mergedState);
+                else
+                    rightState.copyTo(mergedState);
+            }
+        }
     }
 
     /**
@@ -435,25 +494,23 @@ public class CounterContext implements IContext
      */
     public String toString(ByteBuffer context)
     {
-        ContextState state = new ContextState(context, headerLength(context));
+        ContextState state = ContextState.wrap(context);
         StringBuilder sb = new StringBuilder();
         sb.append("[");
 
         while (state.hasRemaining())
         {
-            if (state.elementIdx() > 0)
-            {
+            if (state.getElementIndex() > 0)
                 sb.append(",");
-            }
             sb.append("{");
             sb.append(state.getCounterId().toString()).append(", ");
-            sb.append(state.getClock()).append(", ");;
+            sb.append(state.getClock()).append(", ");
             sb.append(state.getCount());
             sb.append("}");
-            if (state.isDelta())
-            {
+            if (state.isGlobal())
+                sb.append("$");
+            else if (state.isLocal())
                 sb.append("*");
-            }
             state.moveToNext();
         }
 
@@ -481,49 +538,87 @@ public class CounterContext implements IContext
         return total;
     }
 
+    public boolean shouldClearLocal(ByteBuffer context)
+    {
+        // #elt being negative means we have to clean local shards.
+        return context.getShort(context.position()) < 0;
+    }
+
     /**
-     * Mark context to delete delta afterward.
+     * Mark context to delete local references afterward.
      * Marking is done by multiply #elt by -1 to preserve header length
-     * and #elt count in order to clear all delta later.
+     * and #elt count in order to clear all local refs later.
      *
      * @param context a counter context
-     * @return context that marked to delete delta
+     * @return context that marked to delete local refs
      */
-    public ByteBuffer markDeltaToBeCleared(ByteBuffer context)
+    public ByteBuffer markLocalToBeCleared(ByteBuffer context)
     {
-        int headerLength = headerLength(context);
-        if (headerLength == HEADER_SIZE_LENGTH)
-            return context;
-
-        ByteBuffer marked = context.duplicate();
         short count = context.getShort(context.position());
-        // negate #elt to mark as deleted, without changing its size.
-        if (count > 0)
-            marked.putShort(marked.position(), (short) (count * -1));
+        if (count <= 0)
+            return context; // already marked or all are remote.
+
+        boolean hasLocalShards = false;
+        for (int i = 0; i < count; i++)
+        {
+            if (context.getShort(context.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH) >= 0)
+            {
+                hasLocalShards = true;
+                break;
+            }
+        }
+
+        if (!hasLocalShards)
+            return context; // all shards are global or remote.
+
+        ByteBuffer marked = ByteBuffer.allocate(context.remaining());
+        marked.putShort(marked.position(), (short) (count * -1));
+        ByteBufferUtil.arrayCopy(context,
+                                 context.position() + HEADER_SIZE_LENGTH,
+                                 marked,
+                                 marked.position() + HEADER_SIZE_LENGTH,
+                                 context.remaining() - HEADER_SIZE_LENGTH);
         return marked;
     }
 
     /**
-     * Remove all the delta of a context (i.e, set an empty header).
+     * Remove all the local of a context (but keep global).
      *
      * @param context a counter context
-     * @return a version of {@code context} where no count are a delta.
+     * @return a version of {@code context} where no shards are local.
      */
-    public ByteBuffer clearAllDelta(ByteBuffer context)
+    public ByteBuffer clearAllLocal(ByteBuffer context)
     {
-        int headerLength = headerLength(context);
-        if (headerLength == HEADER_SIZE_LENGTH)
-            return context;
-
-        ByteBuffer cleaned = ByteBuffer.allocate(context.remaining() - headerLength + HEADER_SIZE_LENGTH);
-        cleaned.putShort(cleaned.position(), (short)0);
-        ByteBufferUtil.arrayCopy(
-                context,
-                context.position() + headerLength,
-                cleaned,
-                cleaned.position() + HEADER_SIZE_LENGTH,
-                context.remaining() - headerLength);
-        return cleaned;
+        int count = Math.abs(context.getShort(context.position()));
+        if (count == 0)
+            return context; // no local or global shards present.
+
+        List<Short> globalShardIndexes = new ArrayList<>(count);
+        for (int i = 0; i < count; i++)
+        {
+            short elt = context.getShort(context.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH);
+            if (elt < 0)
+                globalShardIndexes.add(elt);
+        }
+
+        if (count == globalShardIndexes.size())
+            return context; // no local shards detected.
+
+        // allocate a smaller BB for the cleared context - with no local header elts.
+        ByteBuffer cleared = ByteBuffer.allocate(context.remaining() - (count - globalShardIndexes.size()) * HEADER_ELT_LENGTH);
+
+        cleared.putShort(cleared.position(), (short) globalShardIndexes.size());
+        for (int i = 0; i < globalShardIndexes.size(); i++)
+            cleared.putShort(cleared.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH, globalShardIndexes.get(i));
+
+        int origHeaderLength = headerLength(context);
+        ByteBufferUtil.arrayCopy(context,
+                                 context.position() + origHeaderLength,
+                                 cleared,
+                                 cleared.position() + headerLength(cleared),
+                                 context.remaining() - origHeaderLength);
+
+        return cleared;
     }
 
     public void validateContext(ByteBuffer context) throws MarshalException
@@ -535,15 +630,14 @@ public class CounterContext implements IContext
     /**
      * Update a MessageDigest with the content of a context.
      * Note that this skips the header entirely since the header information
-     * has local meaning only, while digests a meant for comparison across
+     * has local meaning only, while digests are meant for comparison across
      * nodes. This means in particular that we always have:
-     *  updateDigest(ctx) == updateDigest(clearAllDelta(ctx))
+     *  updateDigest(ctx) == updateDigest(clearAllLocal(ctx))
      */
     public void updateDigest(MessageDigest message, ByteBuffer context)
     {
-        int hlength = headerLength(context);
         ByteBuffer dup = context.duplicate();
-        dup.position(context.position() + hlength);
+        dup.position(context.position() + headerLength(context));
         message.update(dup);
     }
 
@@ -569,199 +663,6 @@ public class CounterContext implements IContext
     }
 
     /**
-     * Compute a new context such that if applied to context yields the same
-     * total but with old local counter ids nulified and there content merged to
-     * the current localCounterId.
-     */
-    public ByteBuffer computeOldShardMerger(ByteBuffer context, List<CounterId.CounterIdRecord> oldIds, long mergeBefore)
-    {
-        long now = System.currentTimeMillis();
-        int hlength = headerLength(context);
-        CounterId localId = CounterId.getLocalId();
-
-        Iterator<CounterId.CounterIdRecord> recordIterator = oldIds.iterator();
-        CounterId.CounterIdRecord currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
-
-        ContextState state = new ContextState(context, hlength);
-
-        List<CounterId> toMerge = new ArrayList<CounterId>();
-        long mergeTotal = 0;
-        while (state.hasRemaining() && currRecord != null)
-        {
-            assert !currRecord.id.equals(localId);
-
-            CounterId counterId = state.getCounterId();
-            int c = counterId.compareTo(currRecord.id);
-
-            if (c > 0)
-            {
-                currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
-                continue;
-            }
-
-            if (state.isDelta())
-            {
-                if (state.getClock() < 0)
-                {
-                    // Already merged shard, waiting to be collected
-
-                    if (counterId.equals(localId))
-                        // we should not get there, but we have been creating problematic context prior to #2968
-                        throw new RuntimeException("Current counterId with a negative clock (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
-
-                    if (state.getCount() != 0)
-                    {
-                        // This should not happen, but previous bugs have generated this (#2968 in particular) so fixing it.
-                        logger.error(String.format("Invalid counter context (clock is %d and count is %d for CounterId %s), will fix", state.getCount(), state.getCount(), counterId.toString()));
-                        toMerge.add(counterId);
-                        mergeTotal += state.getCount();
-                    }
-                }
-                else if (c == 0)
-                {
-                    // Found an old id. However, merging an oldId that has just been renewed isn't safe, so
-                    // we check that it has been renewed before mergeBefore.
-                    if (currRecord.timestamp < mergeBefore)
-                    {
-                        toMerge.add(counterId);
-                        mergeTotal += state.getCount();
-                    }
-                }
-            }
-
-            if (c == 0)
-                currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
-
-            state.moveToNext();
-        }
-        // Continuing the iteration so that we can repair invalid shards
-        while (state.hasRemaining())
-        {
-            CounterId counterId = state.getCounterId();
-            if (state.isDelta() && state.getClock() < 0)
-            {
-                if (counterId.equals(localId))
-                    // we should not get there, but we have been creating problematic context prior to #2968
-                    throw new RuntimeException("Current counterId with a negative clock (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
-
-                if (state.getCount() != 0)
-                {
-                    // This should not happen, but previous bugs have generated this (#2968 in particular) so fixing it.
-                    logger.error(String.format("Invalid counter context (clock is %d and count is %d for CounterId %s), will fix", state.getClock(), state.getCount(), counterId.toString()));
-                    toMerge.add(counterId);
-                    mergeTotal += state.getCount();
-                }
-            }
-            state.moveToNext();
-        }
-
-        if (toMerge.isEmpty())
-            return null;
-
-        ContextState merger = ContextState.allocate(toMerge.size() + 1, toMerge.size() + 1);
-        state.reset();
-        int i = 0;
-        int removedTotal = 0;
-        boolean localWritten = false;
-        while (state.hasRemaining())
-        {
-            CounterId counterId = state.getCounterId();
-            if (counterId.compareTo(localId) > 0)
-            {
-                merger.writeElement(localId, 1L, mergeTotal, true);
-                localWritten = true;
-            }
-            else if (i < toMerge.size() && counterId.compareTo(toMerge.get(i)) == 0)
-            {
-                long count = state.getCount();
-                removedTotal += count;
-                merger.writeElement(counterId, -now - state.getClock(), -count, true);
-                ++i;
-            }
-            state.moveToNext();
-        }
-        if (!localWritten)
-            merger.writeElement(localId, 1L, mergeTotal, true);
-
-        // sanity check
-        assert mergeTotal == removedTotal;
-        return merger.context;
-    }
-
-    /**
-     * Remove shards that have been canceled through computeOldShardMerger
-     * since a time older than gcBefore.
-     * Used by compaction to strip context of unecessary information,
-     * shrinking them.
-     */
-    public ByteBuffer removeOldShards(ByteBuffer context, int gcBefore)
-    {
-        int hlength = headerLength(context);
-        ContextState state = new ContextState(context, hlength);
-        int removedShards = 0;
-        int removedDelta = 0;
-        while (state.hasRemaining())
-        {
-            long clock = state.getClock();
-            if (clock < 0)
-            {
-                // We should never have a count != 0 when clock < 0.
-                // We know that previous may have created those situation though, so:
-                //   * for delta shard: we throw an exception since computeOldShardMerger should
-                //     have corrected that situation
-                //   * for non-delta shard: it is a much more crappier situation because there is
-                //     not much we can do since we are not responsible for that shard. So we simply
-                //     ignore the shard.
-                if (state.getCount() != 0)
-                {
-                    if (state.isDelta())
-                    {
-                        throw new IllegalStateException("Counter shard with negative clock but count != 0; context = " + toString(context));
-                    }
-                    else
-                    {
-                        logger.debug("Ignoring non-removable non-delta corrupted shard in context " + toString(context));
-                        state.moveToNext();
-                        continue;
-                    }
-                }
-
-                if (-((int)(clock / 1000)) < gcBefore)
-                {
-                    removedShards++;
-                    if (state.isDelta())
-                        removedDelta++;
-                }
-            }
-            state.moveToNext();
-        }
-
-        if (removedShards == 0)
-            return context;
-
-        int removedHeaderSize = removedDelta * HEADER_ELT_LENGTH;
-        int removedBodySize = removedShards * STEP_LENGTH;
-        int newSize = context.remaining() - removedHeaderSize - removedBodySize;
-        int newHlength = hlength - removedHeaderSize;
-        ByteBuffer cleanedContext = HeapAllocator.instance.allocate(newSize);
-        cleanedContext.putShort(cleanedContext.position(), (short) ((newHlength - HEADER_SIZE_LENGTH) / HEADER_ELT_LENGTH));
-        ContextState cleaned = new ContextState(cleanedContext, newHlength);
-
-        state.reset();
-        while (state.hasRemaining())
-        {
-            long clock = state.getClock();
-            if (clock >= 0 || state.getCount() != 0 || -((int)(clock / 1000)) >= gcBefore)
-            {
-                state.copyTo(cleaned);
-            }
-
-            state.moveToNext();
-        }
-        return cleanedContext;
-    }
-
-    /**
      * Helper class to work on contexts (works by iterating over them).
      * A context being abstractly a list of tuple (counterid, clock, count), a
      * ContextState encapsulate a context and a position to one of the tuple.
@@ -775,74 +676,97 @@ public class CounterContext implements IContext
     {
         public final ByteBuffer context;
         public final int headerLength;
-        private int headerOffset;  // offset from context.position()
-        private int bodyOffset;    // offset from context.position()
-        private boolean currentIsDelta;
 
-        public ContextState(ByteBuffer context, int headerLength)
+        private int headerOffset;        // offset from context.position()
+        private int bodyOffset;          // offset from context.position()
+        private boolean currentIsGlobal;
+        private boolean currentIsLocal;
+
+        private ContextState(ByteBuffer context)
         {
-            this(context, headerLength, HEADER_SIZE_LENGTH, headerLength, false);
-            updateIsDelta();
+            this.context = context;
+            this.headerLength = this.bodyOffset = headerLength(context);
+            this.headerOffset = HEADER_SIZE_LENGTH;
+            updateIsGlobalOrLocal();
         }
 
-        public ContextState(ByteBuffer context)
+        public static ContextState wrap(ByteBuffer context)
         {
-            this(context, headerLength(context));
+            return new ContextState(context);
         }
 
-        private ContextState(ByteBuffer context, int headerLength, int headerOffset, int bodyOffset, boolean currentIsDelta)
+        /**
+         * Allocate a new context big enough for globalCount + localCount + remoteCount elements
+         * and return the initial corresponding ContextState.
+         */
+        public static ContextState allocate(int globalCount, int localCount, int remoteCount, Allocator allocator)
         {
-            this.context = context;
-            this.headerLength = headerLength;
-            this.headerOffset = headerOffset;
-            this.bodyOffset = bodyOffset;
-            this.currentIsDelta = currentIsDelta;
+            int headerLength = HEADER_SIZE_LENGTH + (globalCount + localCount) * HEADER_ELT_LENGTH;
+            int bodyLength = (globalCount + localCount + remoteCount) * STEP_LENGTH;
+
+            ByteBuffer buffer = allocator.allocate(headerLength + bodyLength);
+            buffer.putShort(buffer.position(), (short) (globalCount + localCount));
+
+            return ContextState.wrap(buffer);
         }
 
-        public boolean isDelta()
+        public boolean isGlobal()
         {
-            return currentIsDelta;
+            return currentIsGlobal;
         }
 
-        private void updateIsDelta()
+        public boolean isLocal()
         {
-            currentIsDelta = (headerOffset < headerLength) && context.getShort(context.position() + headerOffset) == (short) elementIdx();
+            return currentIsLocal;
         }
 
-        public boolean hasRemaining()
+        public boolean isRemote()
         {
-            return bodyOffset < context.remaining();
+            return !(currentIsGlobal || currentIsLocal);
         }
 
-        public int remainingHeaderLength()
+        private void updateIsGlobalOrLocal()
         {
-            return headerLength - headerOffset;
+            if (headerOffset >= headerLength)
+            {
+                currentIsGlobal = currentIsLocal = false;
+            }
+            else
+            {
+                short headerElt = context.getShort(context.position() + headerOffset);
+                currentIsGlobal = headerElt == getElementIndex() + Short.MIN_VALUE;
+                currentIsLocal = headerElt == getElementIndex();
+            }
         }
 
-        public int remainingBodyLength()
+        public boolean hasRemaining()
         {
-            return context.remaining() - bodyOffset;
+            return bodyOffset < context.remaining();
         }
 
         public void moveToNext()
         {
             bodyOffset += STEP_LENGTH;
-            if (currentIsDelta)
-            {
+            if (currentIsGlobal || currentIsLocal)
                 headerOffset += HEADER_ELT_LENGTH;
-            }
-            updateIsDelta();
+            updateIsGlobalOrLocal();
         }
 
-        // This advance other to the next position (but not this)
         public void copyTo(ContextState other)
         {
-            ByteBufferUtil.arrayCopy(context, context.position() + bodyOffset, other.context, other.context.position() + other.bodyOffset, STEP_LENGTH);
-            if (currentIsDelta)
-            {
-                other.context.putShort(other.context.position() + other.headerOffset, (short) other.elementIdx());
-            }
-            other.currentIsDelta = currentIsDelta;
+            ByteBufferUtil.arrayCopy(context,
+                                     context.position() + bodyOffset,
+                                     other.context,
+                                     other.context.position() + other.bodyOffset,
+                                     STEP_LENGTH);
+
+            if (currentIsGlobal)
+                other.context.putShort(other.context.position() + other.headerOffset, (short) (other.getElementIndex() + Short.MIN_VALUE));
+            else if (currentIsLocal)
+                context.putShort(other.context.position() + other.headerOffset, (short) other.getElementIndex());
+
+            other.currentIsGlobal = currentIsGlobal;
+            other.currentIsLocal = currentIsLocal;
             other.moveToNext();
         }
 
@@ -855,7 +779,12 @@ public class CounterContext implements IContext
         {
             this.headerOffset = HEADER_SIZE_LENGTH;
             this.bodyOffset = headerLength;
-            updateIsDelta();
+            updateIsGlobalOrLocal();
+        }
+
+        public int getElementIndex()
+        {
+            return (bodyOffset - headerLength) / STEP_LENGTH;
         }
 
         public CounterId getCounterId()
@@ -873,50 +802,44 @@ public class CounterContext implements IContext
             return context.getLong(context.position() + bodyOffset + CounterId.LENGTH + CLOCK_LENGTH);
         }
 
-        // Advance this to the next position
-        public void writeElement(CounterId id, long clock, long count, boolean isDelta)
+        // In 2.0 only used by the unit tests.
+        public void writeGlobal(CounterId id, long clock, long count)
         {
-            writeElementAtOffset(context, context.position() + bodyOffset, id, clock, count);
-            if (isDelta)
-            {
-                context.putShort(context.position() + headerOffset, (short)elementIdx());
-            }
-            currentIsDelta = isDelta;
-            moveToNext();
+            writeElement(id, clock, count, true, false);
         }
 
-        public void writeElement(CounterId id, long clock, long count)
+        public void writeLocal(CounterId id, long clock, long count)
         {
-            writeElement(id, clock, count, false);
+            writeElement(id, clock, count, false, true);
         }
 
-        public int elementIdx()
+        public void writeRemote(CounterId id, long clock, long count)
         {
-            return (bodyOffset - headerLength) / STEP_LENGTH;
+            writeElement(id, clock, count, false, false);
         }
 
-        public ContextState duplicate()
+        private void writeElement(CounterId id, long clock, long count, boolean isGlobal, boolean isLocal)
         {
-            return new ContextState(context, headerLength, headerOffset, bodyOffset, currentIsDelta);
-        }
+            writeElementAtOffset(context, context.position() + bodyOffset, id, clock, count);
 
-        /*
-         * Allocate a new context big enough for {@code elementCount} elements
-         * with {@code deltaCount} of them being delta, and return the initial
-         * ContextState corresponding.
-         */
-        public static ContextState allocate(int elementCount, int deltaCount)
-        {
-            return allocate(elementCount, deltaCount, HeapAllocator.instance);
+            if (isGlobal)
+                context.putShort(context.position() + headerOffset, (short) (getElementIndex() + Short.MIN_VALUE));
+            else if (isLocal)
+                context.putShort(context.position() + headerOffset, (short) getElementIndex());
+
+            currentIsGlobal = isGlobal;
+            currentIsLocal = isLocal;
+            moveToNext();
         }
 
-        public static ContextState allocate(int elementCount, int deltaCount, Allocator allocator)
+        // write a tuple (counter id, clock, count) at an absolute (bytebuffer-wise) offset
+        private void writeElementAtOffset(ByteBuffer ctx, int offset, CounterId id, long clock, long count)
         {
-            assert deltaCount <= elementCount;
-            int hlength = HEADER_SIZE_LENGTH + deltaCount * HEADER_ELT_LENGTH;
-            ByteBuffer context = allocator.allocate(hlength + elementCount * STEP_LENGTH);
-            context.putShort(context.position(), (short)deltaCount);
-            return new ContextState(context, hlength);
+            ctx = ctx.duplicate();
+            ctx.position(offset);
+            ctx.put(id.bytes().duplicate());
+            ctx.putLong(clock);
+            ctx.putLong(count);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 0059fda..e337185 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.CounterId;
+import org.apache.cassandra.utils.HeapAllocator;
 import org.apache.cassandra.utils.Pair;
 
 public abstract class AbstractSSTableSimpleWriter
@@ -154,7 +155,9 @@ public abstract class AbstractSSTableSimpleWriter
      */
     public void addCounterColumn(ByteBuffer name, long value)
     {
-        addColumn(new CounterColumn(name, CounterContext.instance().create(counterid, 1L, value, false), System.currentTimeMillis()));
+        addColumn(new CounterColumn(name,
+                                    CounterContext.instance().createRemote(counterid, 1L, value, HeapAllocator.instance),
+                                    System.currentTimeMillis()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 3d19d83..81b3c27 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -246,7 +246,7 @@ public class SSTableWriter extends SSTable
                 if (atom == null)
                     break;
                 if (atom instanceof CounterColumn)
-                    atom = ((CounterColumn) atom).markDeltaToBeCleared();
+                    atom = ((CounterColumn) atom).markLocalToBeCleared();
 
                 int deletionTime = atom.getLocalDeletionTime();
                 if (deletionTime < Integer.MAX_VALUE)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/test/unit/org/apache/cassandra/db/CounterColumnTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterColumnTest.java b/test/unit/org/apache/cassandra/db/CounterColumnTest.java
index 7c2be9f..1f2c078 100644
--- a/test/unit/org/apache/cassandra/db/CounterColumnTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterColumnTest.java
@@ -32,10 +32,11 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.context.CounterContext;
-import static org.apache.cassandra.db.context.CounterContext.ContextState;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.*;
 
+import static org.apache.cassandra.db.context.CounterContext.ContextState;
+
 public class CounterColumnTest extends SchemaLoader
 {
     private static final CounterContext cc = new CounterContext();
@@ -66,8 +67,8 @@ public class CounterColumnTest extends SchemaLoader
         assert 1 == column.value().getShort(0);
         assert 0 == column.value().getShort(2);
         assert CounterId.wrap(column.value(), 4).isLocalId();
-        assert 1L == column.value().getLong(4 + 0*stepLength + idLength);
-        assert delta == column.value().getLong(4 + 0*stepLength + idLength + clockLength);
+        assert 1L == column.value().getLong(4 + idLength);
+        assert delta == column.value().getLong(4 + idLength + clockLength);
     }
 
     @Test
@@ -79,6 +80,8 @@ public class CounterColumnTest extends SchemaLoader
 
         ByteBuffer context;
 
+        Allocator allocator = HeapAllocator.instance;
+
         // tombstone + tombstone
         left  = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 1L);
         right = new DeletedColumn(ByteBufferUtil.bytes("x"), 2, 2L);
@@ -143,20 +146,20 @@ public class CounterColumnTest extends SchemaLoader
         assert ((CounterColumn)reconciled).timestampOfLastDelete() == right.getMarkedForDeleteAt();
 
         // live < live last delete
-        left  = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 2L, 3L, false), 1L, Long.MIN_VALUE);
-        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 1L, 1L, false), 4L, 3L);
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L, allocator), 1L, Long.MIN_VALUE);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L, allocator), 4L, 3L);
 
         assert left.reconcile(right) == right;
 
         // live last delete > live
-        left  = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 2L, 3L, false), 6L, 5L);
-        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 1L, 1L, false), 4L, 3L);
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L, allocator), 6L, 5L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L, allocator), 4L, 3L);
 
         assert left.reconcile(right) == left;
 
         // live + live
-        left = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 1L, 1L, false), 4L, Long.MIN_VALUE);
-        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 2L, 3L, false), 1L, Long.MIN_VALUE);
+        left = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L, allocator), 4L, Long.MIN_VALUE);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L, allocator), 1L, Long.MIN_VALUE);
 
         reconciled = left.reconcile(right);
         assert reconciled.name().equals(left.name());
@@ -164,7 +167,7 @@ public class CounterColumnTest extends SchemaLoader
         assert reconciled.timestamp() == 4L;
 
         left = reconciled;
-        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(2), 1L, 5L, false), 2L, Long.MIN_VALUE);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(2), 1L, 5L, allocator), 2L, Long.MIN_VALUE);
 
         reconciled = left.reconcile(right);
         assert reconciled.name().equals(left.name());
@@ -172,7 +175,7 @@ public class CounterColumnTest extends SchemaLoader
         assert reconciled.timestamp() == 4L;
 
         left = reconciled;
-        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(2), 2L, 2L, false), 6L, Long.MIN_VALUE);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(2), 2L, 2L, allocator), 6L, Long.MIN_VALUE);
 
         reconciled = left.reconcile(right);
         assert reconciled.name().equals(left.name());
@@ -183,13 +186,13 @@ public class CounterColumnTest extends SchemaLoader
         int hd = 2; // header
         assert hd + 2 * stepLength == context.remaining();
 
-        assert Util.equalsCounterId(CounterId.fromInt(1), context, hd + 0 * stepLength);
-        assert 2L == context.getLong(hd + 0*stepLength + idLength);
-        assert 3L == context.getLong(hd + 0*stepLength + idLength + clockLength);
+        assert Util.equalsCounterId(CounterId.fromInt(1), context, hd);
+        assert 2L == context.getLong(hd + idLength);
+        assert 3L == context.getLong(hd + idLength + clockLength);
 
-        assert Util.equalsCounterId(CounterId.fromInt(2), context, hd + 1 * stepLength);
-        assert 2L == context.getLong(hd + 1*stepLength + idLength);
-        assert 2L == context.getLong(hd + 1*stepLength + idLength + clockLength);
+        assert Util.equalsCounterId(CounterId.fromInt(2), context, hd + stepLength);
+        assert 2L == context.getLong(hd + stepLength + idLength);
+        assert 2L == context.getLong(hd + stepLength + idLength + clockLength);
 
         assert ((CounterColumn)reconciled).timestampOfLastDelete() == Long.MIN_VALUE;
     }
@@ -219,27 +222,27 @@ public class CounterColumnTest extends SchemaLoader
         assert null     == rightCol.diff(leftCol);
 
         // equality: equal nodes, all counts same
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 3L, 0L);
-        left.writeElement(CounterId.fromInt(6), 2L, 0L);
-        left.writeElement(CounterId.fromInt(9), 1L, 0L);
-        right = new ContextState(ByteBufferUtil.clone(left.context), 2);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 3L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 1L, 0L);
+        right = ContextState.wrap(ByteBufferUtil.clone(left.context));
 
         leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), left.context,  1L);
         rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right.context, 1L);
         assert leftCol.diff(rightCol) == null;
 
         // greater than: left has superset of nodes (counts equal)
-        left = ContextState.allocate(4, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 3L, 0L);
-        left.writeElement(CounterId.fromInt(6), 2L, 0L);
-        left.writeElement(CounterId.fromInt(9), 1L, 0L);
-        left.writeElement(CounterId.fromInt(12), 0L, 0L);
+        left = ContextState.allocate(0, 0, 4, allocator);
+        left.writeRemote(CounterId.fromInt(3), 3L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 1L, 0L);
+        left.writeRemote(CounterId.fromInt(12), 0L, 0L);
 
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(3), 3L, 0L);
-        right.writeElement(CounterId.fromInt(6), 2L, 0L);
-        right.writeElement(CounterId.fromInt(9), 1L, 0L);
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(3), 3L, 0L);
+        right.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        right.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
         leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), left.context,  1L);
         rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right.context, 1L);
@@ -249,15 +252,15 @@ public class CounterColumnTest extends SchemaLoader
         assert leftCol == rightCol.diff(leftCol);
 
         // disjoint: right and left have disjoint node sets
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 1L, 0L);
-        left.writeElement(CounterId.fromInt(4), 1L, 0L);
-        left.writeElement(CounterId.fromInt(9), 1L, 0L);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 1L, 0L);
+        left.writeRemote(CounterId.fromInt(4), 1L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(3), 1L, 0L);
-        right.writeElement(CounterId.fromInt(6), 1L, 0L);
-        right.writeElement(CounterId.fromInt(9), 1L, 0L);
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(3), 1L, 0L);
+        right.writeRemote(CounterId.fromInt(6), 1L, 0L);
+        right.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
         leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), left.context,  1L);
         rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right.context, 1L);
@@ -269,11 +272,11 @@ public class CounterColumnTest extends SchemaLoader
     public void testSerializeDeserialize() throws IOException
     {
         Allocator allocator = HeapAllocator.instance;
-        CounterContext.ContextState state = CounterContext.ContextState.allocate(4, 2, allocator);
-        state.writeElement(CounterId.fromInt(1), 4L, 4L);
-        state.writeElement(CounterId.fromInt(2), 4L, 4L, true);
-        state.writeElement(CounterId.fromInt(3), 4L, 4L);
-        state.writeElement(CounterId.fromInt(4), 4L, 4L, true);
+        CounterContext.ContextState state = CounterContext.ContextState.allocate(0, 2, 2, allocator);
+        state.writeRemote(CounterId.fromInt(1), 4L, 4L);
+        state.writeLocal(CounterId.fromInt(2), 4L, 4L);
+        state.writeRemote(CounterId.fromInt(3), 4L, 4L);
+        state.writeLocal(CounterId.fromInt(4), 4L, 4L);
 
         CounterColumn original = new CounterColumn(ByteBufferUtil.bytes("x"), state.context, 1L);
         DataOutputBuffer bufOut = new DataOutputBuffer();
@@ -288,7 +291,7 @@ public class CounterColumnTest extends SchemaLoader
         CounterColumn deserializedOnRemote = (CounterColumn) Column.serializer.deserialize(new DataInputStream(bufIn), ColumnSerializer.Flag.FROM_REMOTE);
         assert deserializedOnRemote.name().equals(original.name());
         assert deserializedOnRemote.total() == original.total();
-        assert deserializedOnRemote.value().equals(cc.clearAllDelta(original.value()));
+        assert deserializedOnRemote.value().equals(cc.clearAllLocal(original.value()));
         assert deserializedOnRemote.timestamp() == deserialized.timestamp();
         assert deserializedOnRemote.timestampOfLastDelete() == deserialized.timestampOfLastDelete();
     }
@@ -300,14 +303,14 @@ public class CounterColumnTest extends SchemaLoader
         MessageDigest digest1 = MessageDigest.getInstance("md5");
         MessageDigest digest2 = MessageDigest.getInstance("md5");
 
-        CounterContext.ContextState state = CounterContext.ContextState.allocate(4, 2, allocator);
-        state.writeElement(CounterId.fromInt(1), 4L, 4L);
-        state.writeElement(CounterId.fromInt(2), 4L, 4L, true);
-        state.writeElement(CounterId.fromInt(3), 4L, 4L);
-        state.writeElement(CounterId.fromInt(4), 4L, 4L, true);
+        CounterContext.ContextState state = CounterContext.ContextState.allocate(0, 2, 2, allocator);
+        state.writeRemote(CounterId.fromInt(1), 4L, 4L);
+        state.writeLocal(CounterId.fromInt(2), 4L, 4L);
+        state.writeRemote(CounterId.fromInt(3), 4L, 4L);
+        state.writeLocal(CounterId.fromInt(4), 4L, 4L);
 
         CounterColumn original = new CounterColumn(ByteBufferUtil.bytes("x"), state.context, 1L);
-        CounterColumn cleared = new CounterColumn(ByteBufferUtil.bytes("x"), cc.clearAllDelta(state.context), 1L);
+        CounterColumn cleared = new CounterColumn(ByteBufferUtil.bytes("x"), cc.clearAllLocal(state.context), 1L);
 
         original.updateDigest(digest1);
         cleared.updateDigest(digest2);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/test/unit/org/apache/cassandra/db/CounterMutationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterMutationTest.java b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
index 3cbd030..389b7b7 100644
--- a/test/unit/org/apache/cassandra/db/CounterMutationTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
@@ -18,87 +18,16 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.List;
 
 import org.junit.Test;
-import static org.junit.Assert.fail;
 
-import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.utils.*;
-import org.apache.cassandra.Util;
-import static org.apache.cassandra.db.context.CounterContext.ContextState;
 
 public class CounterMutationTest extends SchemaLoader
 {
     @Test
-    public void testMergeOldShards() throws IOException
-    {
-        RowMutation rm;
-        CounterMutation cm;
-
-        CounterId id1 = CounterId.getLocalId();
-
-        rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
-        rm.addCounter("Counter1", ByteBufferUtil.bytes("Column1"), 3);
-        cm = new CounterMutation(rm, ConsistencyLevel.ONE);
-        cm.apply();
-
-        CounterId.renewLocalId(2L); // faking time of renewal for test
-        CounterId id2 = CounterId.getLocalId();
-
-        rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
-        rm.addCounter("Counter1", ByteBufferUtil.bytes("Column1"), 4);
-        cm = new CounterMutation(rm, ConsistencyLevel.ONE);
-        cm.apply();
-
-        CounterId.renewLocalId(4L); // faking time of renewal for test
-        CounterId id3 = CounterId.getLocalId();
-
-        rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
-        rm.addCounter("Counter1", ByteBufferUtil.bytes("Column1"), 5);
-        rm.addCounter("Counter1", ByteBufferUtil.bytes("Column2"), 1);
-        cm = new CounterMutation(rm, ConsistencyLevel.ONE);
-        cm.apply();
-
-        DecoratedKey dk = Util.dk("key1");
-        ColumnFamily cf = Util.getColumnFamily(Keyspace.open("Keyspace1"), dk, "Counter1");
-
-        // First merges old shards
-        CounterColumn.mergeAndRemoveOldShards(dk, cf, Integer.MIN_VALUE, Integer.MAX_VALUE, false);
-        long now = System.currentTimeMillis();
-        Column c = cf.getColumn(ByteBufferUtil.bytes("Column1"));
-        assert c != null;
-        assert c instanceof CounterColumn;
-        assert ((CounterColumn)c).total() == 12L;
-        ContextState s = new ContextState(c.value());
-        assert s.getCounterId().equals(id1);
-        assert s.getCount() == 0L;
-        assert -s.getClock() > now - 1000 : " >";
-        assert -s.getClock() <= now;
-        s.moveToNext();
-        assert s.getCounterId().equals(id2);
-        assert s.getCount() == 0L;
-        assert -s.getClock() > now - 1000;
-        assert -s.getClock() <= now;
-        s.moveToNext();
-        assert s.getCounterId().equals(id3);
-        assert s.getCount() == 12L;
-
-        // Then collect old shards
-        CounterColumn.mergeAndRemoveOldShards(dk, cf, Integer.MAX_VALUE, Integer.MIN_VALUE, false);
-        c = cf.getColumn(ByteBufferUtil.bytes("Column1"));
-        assert c != null;
-        assert c instanceof CounterColumn;
-        assert ((CounterColumn)c).total() == 12L;
-        s = new ContextState(c.value());
-        assert s.getCounterId().equals(id3);
-        assert s.getCount() == 12L;
-    }
-
-    @Test
     public void testGetOldShardFromSystemKeyspace() throws IOException
     {
         // Renewing a bunch of times and checking we get the same thing from
@@ -112,49 +41,5 @@ public class CounterMutationTest extends SchemaLoader
 
         assert inMem.equals(onDisk);
     }
-
-    @Test
-    public void testRemoveOldShardFixCorrupted() throws IOException
-    {
-        CounterContext ctx = CounterContext.instance();
-        int now = (int) (System.currentTimeMillis() / 1000);
-
-        // Check that corrupted context created prior to #2968 are fixed by removeOldShards
-        CounterId id1 = CounterId.getLocalId();
-        CounterId.renewLocalId();
-        CounterId id2 = CounterId.getLocalId();
-
-        ContextState state = ContextState.allocate(3, 2);
-        state.writeElement(CounterId.fromInt(1), 1, 4, false);
-        state.writeElement(id1, 3, 2, true);
-        state.writeElement(id2, -100, 5, true); // corrupted!
-
-        assert ctx.total(state.context) == 11;
-
-        try
-        {
-            ByteBuffer merger = ctx.computeOldShardMerger(state.context, Collections.<CounterId.CounterIdRecord>emptyList(), 0);
-            ctx.removeOldShards(ctx.merge(state.context, merger, HeapAllocator.instance), now);
-            fail("RemoveOldShards should throw an exception if the current id is non-sensical");
-        }
-        catch (RuntimeException e) {}
-
-        CounterId.renewLocalId();
-        ByteBuffer merger = ctx.computeOldShardMerger(state.context, Collections.<CounterId.CounterIdRecord>emptyList(), 0);
-        ByteBuffer cleaned = ctx.removeOldShards(ctx.merge(state.context, merger, HeapAllocator.instance), now);
-        assert ctx.total(cleaned) == 11;
-
-        // Check it is not corrupted anymore
-        ContextState state2 = new ContextState(cleaned);
-        while (state2.hasRemaining())
-        {
-            assert state2.getClock() >= 0 || state2.getCount() == 0;
-            state2.moveToNext();
-        }
-
-        // Check that if we merge old and clean on another node, we keep the right count
-        ByteBuffer onRemote = ctx.merge(ctx.clearAllDelta(state.context), ctx.clearAllDelta(cleaned), HeapAllocator.instance);
-        assert ctx.total(onRemote) == 11;
-    }
 }