You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/27 23:40:01 UTC

[1/3] New counters implementation

Updated Branches:
  refs/heads/trunk 1218bcacb -> 714c42336


http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
index 5c88fd6..ea5dd3e 100644
--- a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
+++ b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
@@ -26,7 +26,8 @@ import java.nio.ByteBuffer;
 
 import org.junit.Test;
 
-import org.apache.cassandra.db.context.IContext.ContextRelationship;
+import org.apache.cassandra.db.ClockAndCount;
+import org.apache.cassandra.db.context.CounterContext.Relationship;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.utils.*;
 
@@ -92,7 +93,7 @@ public class CounterContextTest
         left.writeRemote(CounterId.fromInt(9), 1L, 0L);
         right = ContextState.wrap(ByteBufferUtil.clone(left.context));
 
-        assertEquals(ContextRelationship.EQUAL, cc.diff(left.context, right.context));
+        assertEquals(Relationship.EQUAL, cc.diff(left.context, right.context));
 
         // greater than: left has superset of nodes (counts equal)
         left = ContextState.allocate(0, 0, 4, allocator);
@@ -106,7 +107,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(6), 2L, 0L);
         right.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        assertEquals(ContextRelationship.GREATER_THAN, cc.diff(left.context, right.context));
+        assertEquals(Relationship.GREATER_THAN, cc.diff(left.context, right.context));
 
         // less than: left has subset of nodes (counts equal)
         left = ContextState.allocate(0, 0, 3, allocator);
@@ -120,7 +121,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(9),  1L, 0L);
         right.writeRemote(CounterId.fromInt(12), 0L, 0L);
 
-        assertEquals(ContextRelationship.LESS_THAN, cc.diff(left.context, right.context));
+        assertEquals(Relationship.LESS_THAN, cc.diff(left.context, right.context));
 
         // greater than: equal nodes, but left has higher counts
         left = ContextState.allocate(0, 0, 3, allocator);
@@ -133,7 +134,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(6), 2L, 0L);
         right.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        assertEquals(ContextRelationship.GREATER_THAN, cc.diff(left.context, right.context));
+        assertEquals(Relationship.GREATER_THAN, cc.diff(left.context, right.context));
 
         // less than: equal nodes, but right has higher counts
         left = ContextState.allocate(0, 0, 3, allocator);
@@ -146,7 +147,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(6), 9L, 0L);
         right.writeRemote(CounterId.fromInt(9), 3L, 0L);
 
-        assertEquals(ContextRelationship.LESS_THAN, cc.diff(left.context, right.context));
+        assertEquals(Relationship.LESS_THAN, cc.diff(left.context, right.context));
 
         // disjoint: right and left have disjoint node sets
         left = ContextState.allocate(0, 0, 3, allocator);
@@ -159,7 +160,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(6), 1L, 0L);
         right.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
+        assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
 
         left = ContextState.allocate(0, 0, 3, allocator);
         left.writeRemote(CounterId.fromInt(3), 1L, 0L);
@@ -171,7 +172,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(6),  1L, 0L);
         right.writeRemote(CounterId.fromInt(12), 1L, 0L);
 
-        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
+        assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
 
         // disjoint: equal nodes, but right and left have higher counts in differing nodes
         left = ContextState.allocate(0, 0, 3, allocator);
@@ -184,7 +185,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(6), 1L, 0L);
         right.writeRemote(CounterId.fromInt(9), 5L, 0L);
 
-        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
+        assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
 
         left = ContextState.allocate(0, 0, 3, allocator);
         left.writeRemote(CounterId.fromInt(3), 2L, 0L);
@@ -196,7 +197,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(6), 9L, 0L);
         right.writeRemote(CounterId.fromInt(9), 5L, 0L);
 
-        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
+        assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
 
         // disjoint: left has more nodes, but lower counts
         left = ContextState.allocate(0, 0, 4, allocator);
@@ -210,7 +211,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(6), 9L, 0L);
         right.writeRemote(CounterId.fromInt(9), 5L, 0L);
 
-        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
+        assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
 
         // disjoint: left has less nodes, but higher counts
         left = ContextState.allocate(0, 0, 3, allocator);
@@ -224,7 +225,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(9),  2L, 0L);
         right.writeRemote(CounterId.fromInt(12), 1L, 0L);
 
-        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
+        assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
 
         // disjoint: mixed nodes and counts
         left = ContextState.allocate(0, 0, 3, allocator);
@@ -238,7 +239,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(9),  2L, 0L);
         right.writeRemote(CounterId.fromInt(12), 1L, 0L);
 
-        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
+        assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
 
         left = ContextState.allocate(0, 0, 4, allocator);
         left.writeRemote(CounterId.fromInt(3), 5L, 0L);
@@ -251,7 +252,7 @@ public class CounterContextTest
         right.writeRemote(CounterId.fromInt(6), 3L, 0L);
         right.writeRemote(CounterId.fromInt(9), 2L, 0L);
 
-        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
+        assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
     }
 
     @Test
@@ -499,4 +500,67 @@ public class CounterContextTest
         cleared = cc.clearAllLocal(marked);
         assertSame(cleared, marked);
     }
+
+    @Test
+    public void testFindPositionOf()
+    {
+        ContextState state = ContextState.allocate(3, 3, 3, HeapAllocator.instance);
+
+        state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+        state.writeRemote(CounterId.fromInt(2), 2L, 2L);
+        state.writeLocal( CounterId.fromInt(3), 3L, 3L);
+        state.writeGlobal(CounterId.fromInt(4), 4L, 4L);
+        state.writeRemote(CounterId.fromInt(5), 5L, 5L);
+        state.writeLocal( CounterId.fromInt(6), 6L, 6L);
+        state.writeGlobal(CounterId.fromInt(7), 7L, 7L);
+        state.writeRemote(CounterId.fromInt(8), 8L, 8L);
+        state.writeLocal(CounterId.fromInt(9), 9L, 9L);
+
+        int headerLength = headerSizeLength + 6 * headerEltLength;
+        assertEquals(headerLength, cc.findPositionOf(state.context, CounterId.fromInt(1)));
+        assertEquals(headerLength + stepLength, cc.findPositionOf(state.context, CounterId.fromInt(2)));
+        assertEquals(headerLength + 2 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(3)));
+        assertEquals(headerLength + 3 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(4)));
+        assertEquals(headerLength + 4 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(5)));
+        assertEquals(headerLength + 5 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(6)));
+        assertEquals(headerLength + 6 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(7)));
+        assertEquals(headerLength + 7 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(8)));
+        assertEquals(headerLength + 8 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(9)));
+
+        assertEquals(-1, cc.findPositionOf(state.context, CounterId.fromInt(0)));
+        assertEquals(-1, cc.findPositionOf(state.context, CounterId.fromInt(10)));
+        assertEquals(-1, cc.findPositionOf(state.context, CounterId.fromInt(15)));
+        assertEquals(-1, cc.findPositionOf(state.context, CounterId.fromInt(20)));
+    }
+
+    @Test
+    public void testGetGlockAndCountOf()
+    {
+        ContextState state = ContextState.allocate(3, 3, 3, HeapAllocator.instance);
+
+        state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+        state.writeRemote(CounterId.fromInt(2), 2L, 2L);
+        state.writeLocal( CounterId.fromInt(3), 3L, 3L);
+        state.writeGlobal(CounterId.fromInt(4), 4L, 4L);
+        state.writeRemote(CounterId.fromInt(5), 5L, 5L);
+        state.writeLocal( CounterId.fromInt(6), 6L, 6L);
+        state.writeGlobal(CounterId.fromInt(7), 7L, 7L);
+        state.writeRemote(CounterId.fromInt(8), 8L, 8L);
+        state.writeLocal(CounterId.fromInt(9), 9L, 9L);
+
+        assertEquals(ClockAndCount.create(1L, 1L), cc.getClockAndCountOf(state.context, CounterId.fromInt(1)));
+        assertEquals(ClockAndCount.create(2L, 2L), cc.getClockAndCountOf(state.context, CounterId.fromInt(2)));
+        assertEquals(ClockAndCount.create(3L, 3L), cc.getClockAndCountOf(state.context, CounterId.fromInt(3)));
+        assertEquals(ClockAndCount.create(4L, 4L), cc.getClockAndCountOf(state.context, CounterId.fromInt(4)));
+        assertEquals(ClockAndCount.create(5L, 5L), cc.getClockAndCountOf(state.context, CounterId.fromInt(5)));
+        assertEquals(ClockAndCount.create(6L, 6L), cc.getClockAndCountOf(state.context, CounterId.fromInt(6)));
+        assertEquals(ClockAndCount.create(7L, 7L), cc.getClockAndCountOf(state.context, CounterId.fromInt(7)));
+        assertEquals(ClockAndCount.create(8L, 8L), cc.getClockAndCountOf(state.context, CounterId.fromInt(8)));
+        assertEquals(ClockAndCount.create(9L, 9L), cc.getClockAndCountOf(state.context, CounterId.fromInt(9)));
+
+        assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(0)));
+        assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(10)));
+        assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(15)));
+        assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(20)));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
index ea7516f..5f0fb5c 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.apache.cassandra.io.sstable.SSTableUtils.tempSSTableFile;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
 import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
-import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileReader;
@@ -216,7 +215,7 @@ public class SSTableExportTest extends SchemaLoader
         SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
-        cfamily.addColumn(new CounterCell(Util.cellname("colA"), 42, System.currentTimeMillis()));
+        cfamily.addColumn(CounterCell.createLocal(Util.cellname("colA"), 42, System.currentTimeMillis(), Long.MIN_VALUE));
         writer.append(Util.dk("rowA"), cfamily);
         cfamily.clear();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/utils/CounterIdTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CounterIdTest.java b/test/unit/org/apache/cassandra/utils/CounterIdTest.java
new file mode 100644
index 0000000..e7975f2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/CounterIdTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.SystemKeyspace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CounterIdTest extends SchemaLoader
+{
+    @Test
+    public void testGetCurrentIdFromSystemKeyspace() throws IOException
+    {
+        // Renewing a bunch of times and checking we get the same thing from
+        // the system keyspace that what is in memory
+        CounterId id0 = CounterId.getLocalId();
+        assertEquals(id0, SystemKeyspace.getCurrentLocalCounterId());
+
+        CounterId.renewLocalId();
+        CounterId id1 = CounterId.getLocalId();
+        assertEquals(id1, SystemKeyspace.getCurrentLocalCounterId());
+        assertTrue(id1.compareTo(id0) == 1);
+
+        CounterId.renewLocalId();
+        CounterId id2 = CounterId.getLocalId();
+        assertEquals(id2, SystemKeyspace.getCurrentLocalCounterId());
+        assertTrue(id2.compareTo(id1) == 1);
+    }
+}
+


[3/3] git commit: New counters implementation

Posted by al...@apache.org.
New counters implementation

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6504


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/714c4233
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/714c4233
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/714c4233

Branch: refs/heads/trunk
Commit: 714c423360c36da2a2b365efaf9c5c4f623ed133
Parents: 1218bca
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Jan 24 02:52:43 2014 -0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Jan 27 16:37:41 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   5 +
 conf/cassandra.yaml                             |  33 ++-
 doc/cql3/CQL.textile                            |   1 -
 interface/cassandra.thrift                      |   3 +-
 pylib/cqlshlib/cql3handling.py                  |   1 -
 pylib/cqlshlib/test/test_cqlsh_output.py        |   1 -
 .../apache/cassandra/cache/AutoSavingCache.java |   3 +-
 .../apache/cassandra/cache/CounterCacheKey.java |  89 ++++++++
 .../org/apache/cassandra/concurrent/Stage.java  |   6 +-
 .../cassandra/concurrent/StageManager.java      |  14 +-
 .../org/apache/cassandra/config/CFMetaData.java |  20 --
 .../org/apache/cassandra/config/Config.java     |  14 +-
 .../cassandra/config/DatabaseDescriptor.java    |  75 ++++++-
 .../cassandra/cql/AlterTableStatement.java      |   1 -
 .../org/apache/cassandra/cql/CFPropDefs.java    |   3 +-
 .../cql/CreateColumnFamilyStatement.java        |   1 -
 .../cql3/statements/BatchStatement.java         |   7 +-
 .../cassandra/cql3/statements/CFPropDefs.java   |   5 +-
 .../cql3/statements/ModificationStatement.java  |   6 +-
 src/java/org/apache/cassandra/db/Cell.java      |  13 +-
 .../org/apache/cassandra/db/ClockAndCount.java  |  73 +++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  89 ++++++--
 .../apache/cassandra/db/ConsistencyLevel.java   |  11 +-
 .../org/apache/cassandra/db/CounterCell.java    |  37 +---
 .../apache/cassandra/db/CounterMutation.java    | 210 +++++++++++++-----
 .../apache/cassandra/db/CounterUpdateCell.java  |  19 +-
 .../org/apache/cassandra/db/DeletedCell.java    |   7 -
 .../org/apache/cassandra/db/ExpiringCell.java   |   7 -
 src/java/org/apache/cassandra/db/IMutation.java |   2 +-
 src/java/org/apache/cassandra/db/Mutation.java  |   6 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  21 --
 .../db/compaction/CompactionManager.java        |  23 +-
 .../cassandra/db/context/CounterContext.java    | 214 +++++++++----------
 .../apache/cassandra/db/context/IContext.java   |  75 -------
 .../io/sstable/AbstractSSTableSimpleWriter.java |   2 +-
 .../apache/cassandra/net/MessagingService.java  |   2 +-
 .../service/AbstractWriteResponseHandler.java   |   7 +-
 .../apache/cassandra/service/CacheService.java  | 127 ++++++++++-
 .../cassandra/service/CacheServiceMBean.java    |  10 +
 .../apache/cassandra/service/StorageProxy.java  |  38 ++--
 .../cassandra/service/StorageProxyMBean.java    |   2 +
 .../cassandra/service/StorageService.java       |  18 +-
 .../cassandra/thrift/CassandraServer.java       |   7 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |  15 +-
 .../org/apache/cassandra/tools/NodeTool.java    |  42 +++-
 .../org/apache/cassandra/utils/CounterId.java   | 106 ++-------
 .../unit/org/apache/cassandra/SchemaLoader.java |   8 +
 test/unit/org/apache/cassandra/Util.java        |   4 +-
 .../org/apache/cassandra/config/DefsTest.java   |   1 -
 .../org/apache/cassandra/db/CleanupTest.java    |   5 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   7 +-
 .../apache/cassandra/db/CounterCacheTest.java   |  96 +++++++++
 .../apache/cassandra/db/CounterCellTest.java    |  44 ++--
 .../cassandra/db/CounterMutationTest.java       | 180 ++++++++++++++--
 .../cassandra/db/RecoveryManagerTest.java       |   2 +-
 .../db/context/CounterContextTest.java          |  92 ++++++--
 .../cassandra/tools/SSTableExportTest.java      |   3 +-
 .../apache/cassandra/utils/CounterIdTest.java   |  51 +++++
 59 files changed, 1333 insertions(+), 632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 10548fa..cc406c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
  * CF id is changed to be non-deterministic. Data dir/key cache are created
    uniquely for CF id (CASSANDRA-5202)
  * Cassandra won't start by default without jna (CASSANDRA-6575)
+ * New counters implementation (CASSANDRA-6504)
 
 
 2.0.5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 3070500..4e00faf 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,11 @@ Upgrading
      cold_reads_to_omit compaction option; 0.0 omits nothing (the old
      behavior) and 1.0 omits everything.
    - Multithreaded compaction has been removed.
+   - Counters implementation has been changed, replaced by a safer one with
+     less caveats, but different performance characteristics. You might have
+     to change your data model to accomodate the new implementation.
+     (See https://issues.apache.org/jira/browse/CASSANDRA-6504 and the dev
+     blog post at http://www.datastax.com/dev/blog/<PLACEHOLDER> for details).
 
 2.0.5
 =====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index bdbb9ff..06cb33f 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -171,6 +171,32 @@ row_cache_save_period: 0
 # Disabled by default, meaning all keys are going to be saved
 # row_cache_keys_to_save: 100
 
+# Maximum size of the counter cache in memory.
+#
+# Counter cache helps to reduce counter locks' contention for hot counter cells.
+# In case of RF = 1 a counter cache hit will cause Cassandra to skip the read before
+# write entirely. With RF > 1 a counter cache hit will still help to reduce the duration
+# of the lock hold, helping with hot counter cell updates, but will not allow skipping
+# the read entirely. Only the local (clock, count) tuple of a counter cell is kept
+# in memory, not the whole counter, so it's relatively cheap.
+#
+# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup.
+#
+# Default value is empty to make it "auto" (min(2.5% of Heap (in MB), 50MB)). Set to 0 to disable counter cache.
+# NOTE: if you perform counter deletes and rely on low gcgs, you should disable the counter cache.
+counter_cache_size_in_mb:
+
+# Duration in seconds after which Cassandra should
+# save the counter cache (keys only). Caches are saved to saved_caches_directory as
+# specified in this configuration file.
+#
+# Default is 7200 or 2 hours.
+counter_cache_save_period: 7200
+
+# Number of keys from the counter cache to save
+# Disabled by default, meaning all keys are going to be saved
+# counter_cache_keys_to_save: 100
+
 # The off-heap memory allocator.  Affects storage engine metadata as
 # well as caches.  Experiments show that JEMAlloc saves some memory
 # than the native GCC allocator (i.e., JEMalloc is more
@@ -234,13 +260,16 @@ seed_provider:
 # bottleneck will be reads that need to fetch data from
 # disk. "concurrent_reads" should be set to (16 * number_of_drives) in
 # order to allow the operations to enqueue low enough in the stack
-# that the OS and drives can reorder them.
+# that the OS and drives can reorder them. Same applies to
+# "concurrent_counter_writes", since counter writes read the current
+# values before incrementing and writing them back.
 #
 # On the other hand, since writes are almost never IO bound, the ideal
 # number of "concurrent_writes" is dependent on the number of cores in
 # your system; (8 * number_of_cores) is a good rule of thumb.
 concurrent_reads: 32
 concurrent_writes: 32
+concurrent_counter_writes: 32
 
 # Total memory to use for sstable-reading buffers.  Defaults to
 # the smaller of 1/4 of heap or 512MB.
@@ -491,6 +520,8 @@ read_request_timeout_in_ms: 5000
 range_request_timeout_in_ms: 10000
 # How long the coordinator should wait for writes to complete
 write_request_timeout_in_ms: 2000
+# How long the coordinator should wait for counter writes to complete
+counter_write_request_timeout_in_ms: 5000
 # How long a coordinator should continue to retry a CAS operation
 # that contends with other proposals for the same row
 cas_contention_timeout_in_ms: 1000

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index 5fff402..6d68584 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -308,7 +308,6 @@ Table creation supports the following other @<property>@:
 |@bloom_filter_fp_chance@     | _simple_ | 0.00075     | The target probability of false positive of the sstable bloom filters. Said bloom filters will be sized to provide the provided probability (thus lowering this value impact the size of bloom filters in-memory and on-disk)|
 |@compaction@                 | _map_    | _see below_ | The compaction options to use, see below.|
 |@compression@                | _map_    | _see below_ | Compression options, see below. |
-|@replicate_on_write@         | _simple_ | true        | Whether to replicate data on write. This can only be set to false for tables with counters values. Disabling this is dangerous and can result in random lose of counters, don't disable unless you are sure to know what you are doing|
 |@caching@                    | _simple_ | keys_only   | Whether to cache keys ("key cache") and/or rows ("row cache") for this table. Valid values are: @all@, @keys_only@, @rows_only@ and @none@. |
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 780ffc7..289be1f 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -459,7 +459,6 @@ struct CfDef {
     16: optional i32 id,
     17: optional i32 min_compaction_threshold,
     18: optional i32 max_compaction_threshold,
-    24: optional bool replicate_on_write,
     26: optional string key_validation_class,
     28: optional binary key_alias,
     29: optional string compaction_strategy,
@@ -492,6 +491,8 @@ struct CfDef {
     /** @deprecated */
     23: optional double memtable_operations_in_millions,
     /** @deprecated */
+    24: optional bool replicate_on_write,
+    /** @deprecated */
     25: optional double merge_shards_chance,
     /** @deprecated */
     27: optional string row_cache_provider,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 421ab27..c4fa97d 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -68,7 +68,6 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
         ('gc_grace_seconds', None),
         ('index_interval', None),
         ('read_repair_chance', None),
-        ('replicate_on_write', None),
         ('populate_io_cache_on_flush', None),
         ('default_time_to_live', None),
         ('speculative_retry', None),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/pylib/cqlshlib/test/test_cqlsh_output.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_output.py b/pylib/cqlshlib/test/test_cqlsh_output.py
index f89127d..102a040 100644
--- a/pylib/cqlshlib/test/test_cqlsh_output.py
+++ b/pylib/cqlshlib/test/test_cqlsh_output.py
@@ -661,7 +661,6 @@ class TestCqlshOutput(BaseTestCase):
               gc_grace_seconds=864000 AND
               index_interval=128 AND
               read_repair_chance=0.100000 AND
-              replicate_on_write='true' AND
               populate_io_cache_on_flush='false' AND
               default_time_to_live=0 AND
               speculative_retry='NONE' AND

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 3ed2c2c..f94999e 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -127,7 +127,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                 for (Future<Pair<K, V>> future : futures)
                 {
                     Pair<K, V> entry = future.get();
-                    put(entry.left, entry.right);
+                    if (entry != null)
+                        put(entry.left, entry.right);
                 }
             }
             catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cache/CounterCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CounterCacheKey.java b/src/java/org/apache/cassandra/cache/CounterCacheKey.java
new file mode 100644
index 0000000..acbe323
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/CounterCacheKey.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cache;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.UUID;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.utils.*;
+
+public class CounterCacheKey implements CacheKey
+{
+    public final UUID cfId;
+    public final byte[] partitionKey;
+    public final byte[] cellName;
+
+    private CounterCacheKey(UUID cfId, ByteBuffer partitionKey, CellName cellName)
+    {
+        this.cfId = cfId;
+        this.partitionKey = ByteBufferUtil.getArray(partitionKey);
+        this.cellName = ByteBufferUtil.getArray(cellName.toByteBuffer());
+    }
+
+    public static CounterCacheKey create(UUID cfId, ByteBuffer partitionKey, CellName cellName)
+    {
+        return new CounterCacheKey(cfId, partitionKey, cellName);
+    }
+
+    public PathInfo getPathInfo()
+    {
+        Pair<String, String> cf = Schema.instance.getCF(cfId);
+        return new PathInfo(cf.left, cf.right, cfId);
+    }
+
+    public long memorySize()
+    {
+        return ObjectSizes.getFieldSize(3 * ObjectSizes.getReferenceSize())
+             + ObjectSizes.getArraySize(partitionKey)
+             + ObjectSizes.getArraySize(cellName);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("CounterCacheKey(%s, %s, %s)",
+                             cfId,
+                             ByteBufferUtil.bytesToHex(ByteBuffer.wrap(partitionKey)),
+                             ByteBufferUtil.bytesToHex(ByteBuffer.wrap(cellName)));
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Arrays.deepHashCode(new Object[]{cfId, partitionKey, cellName});
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof CounterCacheKey))
+            return false;
+
+        CounterCacheKey cck = (CounterCacheKey) o;
+
+        return cfId.equals(cck.cfId)
+            && Arrays.equals(partitionKey, cck.partitionKey)
+            && Arrays.equals(cellName, cck.cellName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/concurrent/Stage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java
index f2907e2..6192cab 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -21,6 +21,7 @@ public enum Stage
 {
     READ,
     MUTATION,
+    COUNTER_MUTATION,
     GOSSIP,
     REQUEST_RESPONSE,
     ANTI_ENTROPY,
@@ -28,8 +29,7 @@ public enum Stage
     MISC,
     TRACING,
     INTERNAL_RESPONSE,
-    READ_REPAIR,
-    REPLICATE_ON_WRITE;
+    READ_REPAIR;
 
     public String getJmxType()
     {
@@ -43,9 +43,9 @@ public enum Stage
             case INTERNAL_RESPONSE:
                 return "internal";
             case MUTATION:
+            case COUNTER_MUTATION:
             case READ:
             case REQUEST_RESPONSE:
-            case REPLICATE_ON_WRITE:
             case READ_REPAIR:
                 return "request";
             default:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 2960f22..512d64a 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -43,15 +43,13 @@ public class StageManager
 
     public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
 
-    public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * FBUtilities.getAvailableProcessors();
-
     static
     {
         stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
+        stages.put(Stage.COUNTER_MUTATION, multiThreadedConfigurableStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
         stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders()));
         stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
         stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors()));
-        stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS));
         // the rest are all single-threaded
         stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
         stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
@@ -99,16 +97,6 @@ public class StageManager
                                                      stage.getJmxType());
     }
 
-    private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
-    {
-        return new JMXConfigurableThreadPoolExecutor(numThreads,
-                                                     KEEPALIVE,
-                                                     TimeUnit.SECONDS,
-                                                     new LinkedBlockingQueue<Runnable>(maxTasksBeforeBlock),
-                                                     new NamedThreadFactory(stage.getJmxName()),
-                                                     stage.getJmxType());
-    }
-
     /**
      * Retrieve a stage from the StageManager
      * @param stage name of the stage to be retrieved.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 817d4a3..f377734 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -76,7 +76,6 @@ public final class CFMetaData
 
     public final static double DEFAULT_READ_REPAIR_CHANCE = 0.1;
     public final static double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.0;
-    public final static boolean DEFAULT_REPLICATE_ON_WRITE = true;
     public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
     public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
     public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
@@ -128,7 +127,6 @@ public final class CFMetaData
                                                                     + "comment text,"
                                                                     + "read_repair_chance double,"
                                                                     + "local_read_repair_chance double,"
-                                                                    + "replicate_on_write boolean,"
                                                                     + "gc_grace_seconds int,"
                                                                     + "default_validator text,"
                                                                     + "key_validator text,"
@@ -395,7 +393,6 @@ public final class CFMetaData
     private volatile String comment = "";
     private volatile double readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
     private volatile double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
-    private volatile boolean replicateOnWrite = DEFAULT_REPLICATE_ON_WRITE;
     private volatile int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
     private volatile AbstractType<?> defaultValidator = BytesType.instance;
     private volatile AbstractType<?> keyValidator = BytesType.instance;
@@ -437,7 +434,6 @@ public final class CFMetaData
     public CFMetaData comment(String prop) { comment = enforceCommentNotNull(prop); return this;}
     public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
     public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
-    public CFMetaData replicateOnWrite(boolean prop) {replicateOnWrite = prop; return this;}
     public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
     public CFMetaData defaultValidator(AbstractType<?> prop) {defaultValidator = prop; return this;}
     public CFMetaData keyValidator(AbstractType<?> prop) {keyValidator = prop; return this;}
@@ -624,7 +620,6 @@ public final class CFMetaData
                       .comment(oldCFMD.comment)
                       .readRepairChance(oldCFMD.readRepairChance)
                       .dcLocalReadRepairChance(oldCFMD.dcLocalReadRepairChance)
-                      .replicateOnWrite(oldCFMD.replicateOnWrite)
                       .gcGraceSeconds(oldCFMD.gcGraceSeconds)
                       .defaultValidator(oldCFMD.defaultValidator)
                       .keyValidator(oldCFMD.keyValidator)
@@ -691,11 +686,6 @@ public final class CFMetaData
         return ReadRepairDecision.NONE;
     }
 
-    public boolean getReplicateOnWrite()
-    {
-        return replicateOnWrite;
-    }
-
     public boolean populateIoCacheOnFlush()
     {
         return populateIoCacheOnFlush;
@@ -869,7 +859,6 @@ public final class CFMetaData
             .append(comment, rhs.comment)
             .append(readRepairChance, rhs.readRepairChance)
             .append(dcLocalReadRepairChance, rhs.dcLocalReadRepairChance)
-            .append(replicateOnWrite, rhs.replicateOnWrite)
             .append(gcGraceSeconds, rhs.gcGraceSeconds)
             .append(defaultValidator, rhs.defaultValidator)
             .append(keyValidator, rhs.keyValidator)
@@ -902,7 +891,6 @@ public final class CFMetaData
             .append(comment)
             .append(readRepairChance)
             .append(dcLocalReadRepairChance)
-            .append(replicateOnWrite)
             .append(gcGraceSeconds)
             .append(defaultValidator)
             .append(keyValidator)
@@ -956,8 +944,6 @@ public final class CFMetaData
     {
         if (!cf_def.isSetComment())
             cf_def.setComment("");
-        if (!cf_def.isSetReplicate_on_write())
-            cf_def.setReplicate_on_write(CFMetaData.DEFAULT_REPLICATE_ON_WRITE);
         if (!cf_def.isSetPopulate_io_cache_on_flush())
             cf_def.setPopulate_io_cache_on_flush(CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH);
         if (!cf_def.isSetMin_compaction_threshold())
@@ -1047,7 +1033,6 @@ public final class CFMetaData
 
             return newCFMD.addAllColumnDefinitions(ColumnDefinition.fromThrift(newCFMD, cf_def.column_metadata))
                           .comment(cf_def.comment)
-                          .replicateOnWrite(cf_def.replicate_on_write)
                           .defaultValidator(TypeParser.parse(cf_def.default_validation_class))
                           .compressionParameters(cp)
                           .rebuild();
@@ -1125,7 +1110,6 @@ public final class CFMetaData
         comment = enforceCommentNotNull(cfm.comment);
         readRepairChance = cfm.readRepairChance;
         dcLocalReadRepairChance = cfm.dcLocalReadRepairChance;
-        replicateOnWrite = cfm.replicateOnWrite;
         gcGraceSeconds = cfm.gcGraceSeconds;
         defaultValidator = cfm.defaultValidator;
         keyValidator = cfm.keyValidator;
@@ -1265,7 +1249,6 @@ public final class CFMetaData
         def.setComment(enforceCommentNotNull(comment));
         def.setRead_repair_chance(readRepairChance);
         def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
-        def.setReplicate_on_write(replicateOnWrite);
         def.setPopulate_io_cache_on_flush(populateIoCacheOnFlush);
         def.setGc_grace_seconds(gcGraceSeconds);
         def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
@@ -1628,7 +1611,6 @@ public final class CFMetaData
         adder.add("comment", comment);
         adder.add("read_repair_chance", readRepairChance);
         adder.add("local_read_repair_chance", dcLocalReadRepairChance);
-        adder.add("replicate_on_write", replicateOnWrite);
         adder.add("populate_io_cache_on_flush", populateIoCacheOnFlush);
         adder.add("gc_grace_seconds", gcGraceSeconds);
         adder.add("default_validator", defaultValidator.toString());
@@ -1692,7 +1674,6 @@ public final class CFMetaData
 
             cfm.readRepairChance(result.getDouble("read_repair_chance"));
             cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
-            cfm.replicateOnWrite(result.getBoolean("replicate_on_write"));
             cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
             cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
             cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
@@ -2172,7 +2153,6 @@ public final class CFMetaData
             .append("comment", comment)
             .append("readRepairChance", readRepairChance)
             .append("dclocalReadRepairChance", dcLocalReadRepairChance)
-            .append("replicateOnWrite", replicateOnWrite)
             .append("gcGraceSeconds", gcGraceSeconds)
             .append("defaultValidator", defaultValidator)
             .append("keyValidator", keyValidator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2ea8e38..5a944a2 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -58,6 +58,8 @@ public class Config
 
     public volatile Long write_request_timeout_in_ms = 2000L;
 
+    public volatile Long counter_write_request_timeout_in_ms = 5000L;
+
     public volatile Long cas_contention_timeout_in_ms = 1000L;
 
     public volatile Long truncate_request_timeout_in_ms = 60000L;
@@ -70,7 +72,10 @@ public class Config
 
     public Integer concurrent_reads = 32;
     public Integer concurrent_writes = 32;
-    public Integer concurrent_replicates = 32;
+    public Integer concurrent_counter_writes = 32;
+
+    @Deprecated
+    public Integer concurrent_replicates = null;
 
     public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor
     public Integer memtable_total_space_in_mb;
@@ -165,7 +170,12 @@ public class Config
 
     public long row_cache_size_in_mb = 0;
     public volatile int row_cache_save_period = 0;
-    public int row_cache_keys_to_save = Integer.MAX_VALUE;
+    public volatile int row_cache_keys_to_save = Integer.MAX_VALUE;
+
+    public Long counter_cache_size_in_mb = null;
+    public volatile int counter_cache_save_period = 7200;
+    public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE;
+
     public String memory_allocator = NativeAllocator.class.getSimpleName();
     public boolean populate_io_cache_on_flush = false;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4b627c8..eca8881 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -86,6 +86,7 @@ public class DatabaseDescriptor
     private static RequestSchedulerOptions requestSchedulerOptions;
 
     private static long keyCacheSizeInMB;
+    private static long counterCacheSizeInMB;
     private static IAllocator memoryAllocator;
     private static long indexSummaryCapacityInMB;
 
@@ -248,10 +249,11 @@ public class DatabaseDescriptor
             throw new ConfigurationException("concurrent_writes must be at least 2");
         }
 
-        if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2)
-        {
-            throw new ConfigurationException("concurrent_replicates must be at least 2");
-        }
+        if (conf.concurrent_counter_writes != null && conf.concurrent_counter_writes < 2)
+            throw new ConfigurationException("concurrent_counter_writes must be at least 2");
+
+        if (conf.concurrent_replicates != null)
+            logger.warn("concurrent_replicates has been deprecated and should be removed from cassandra.yaml");
 
         if (conf.file_cache_size_in_mb == null)
             conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
@@ -446,6 +448,22 @@ public class DatabaseDescriptor
                     + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
         }
 
+        try
+        {
+            // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
+            counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
+                    ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
+                    : conf.counter_cache_size_in_mb;
+
+            if (counterCacheSizeInMB < 0)
+                throw new NumberFormatException(); // to escape duplicating error message
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
+                    + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.");
+        }
+
         // if set to empty/"auto" then use 5% of Heap size
         indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
             ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
@@ -780,6 +798,16 @@ public class DatabaseDescriptor
         conf.write_request_timeout_in_ms = timeOutInMillis;
     }
 
+    public static long getCounterWriteRpcTimeout()
+    {
+        return conf.counter_write_request_timeout_in_ms;
+    }
+
+    public static void setCounterWriteRpcTimeout(Long timeOutInMillis)
+    {
+        conf.counter_write_request_timeout_in_ms = timeOutInMillis;
+    }
+
     public static long getCasContentionTimeout()
     {
         return conf.cas_contention_timeout_in_ms;
@@ -818,8 +846,9 @@ public class DatabaseDescriptor
                 return getTruncateRpcTimeout();
             case READ_REPAIR:
             case MUTATION:
-            case COUNTER_MUTATION:
                 return getWriteRpcTimeout();
+            case COUNTER_MUTATION:
+                return getCounterWriteRpcTimeout();
             default:
                 return getRpcTimeout();
         }
@@ -830,7 +859,12 @@ public class DatabaseDescriptor
      */
     public static long getMinRpcTimeout()
     {
-        return Longs.min(getRpcTimeout(), getReadRpcTimeout(), getRangeRpcTimeout(), getWriteRpcTimeout(), getTruncateRpcTimeout());
+        return Longs.min(getRpcTimeout(),
+                         getReadRpcTimeout(),
+                         getRangeRpcTimeout(),
+                         getWriteRpcTimeout(),
+                         getCounterWriteRpcTimeout(),
+                         getTruncateRpcTimeout());
     }
 
     public static double getPhiConvictThreshold()
@@ -853,9 +887,9 @@ public class DatabaseDescriptor
         return conf.concurrent_writes;
     }
 
-    public static int getConcurrentReplicators()
+    public static int getConcurrentCounterWriters()
     {
-        return conf.concurrent_replicates;
+        return conf.concurrent_counter_writes;
     }
 
     public static int getFlushWriters()
@@ -1283,6 +1317,31 @@ public class DatabaseDescriptor
         return conf.row_cache_keys_to_save;
     }
 
+    public static long getCounterCacheSizeInMB()
+    {
+        return counterCacheSizeInMB;
+    }
+
+    public static int getCounterCacheSavePeriod()
+    {
+        return conf.counter_cache_save_period;
+    }
+
+    public static void setCounterCacheSavePeriod(int counterCacheSavePeriod)
+    {
+        conf.counter_cache_save_period = counterCacheSavePeriod;
+    }
+
+    public static int getCounterCacheKeysToSave()
+    {
+        return conf.counter_cache_keys_to_save;
+    }
+
+    public static void setCounterCacheKeysToSave(int counterCacheKeysToSave)
+    {
+        conf.counter_cache_keys_to_save = counterCacheKeysToSave;
+    }
+
     public static IAllocator getoffHeapMemoryAllocator()
     {
         return memoryAllocator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 0d767b2..b5ac464 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -173,7 +173,6 @@ public class AlterTableStatement
         cfm.readRepairChance(cfProps.getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, cfm.getReadRepairChance()));
         cfm.dcLocalReadRepairChance(cfProps.getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair()));
         cfm.gcGraceSeconds(cfProps.getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, cfm.getGcGraceSeconds()));
-        cfm.replicateOnWrite(cfProps.getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfm.getReplicateOnWrite()));
         int minCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfm.getMinCompactionThreshold());
         int maxCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold());
         if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CFPropDefs.java b/src/java/org/apache/cassandra/cql/CFPropDefs.java
index a7d3147..2131d06 100644
--- a/src/java/org/apache/cassandra/cql/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql/CFPropDefs.java
@@ -47,7 +47,6 @@ public class CFPropDefs {
     public static final String KW_DEFAULTVALIDATION = "default_validation";
     public static final String KW_MINCOMPACTIONTHRESHOLD = "min_compaction_threshold";
     public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_compaction_threshold";
-    public static final String KW_REPLICATEONWRITE = "replicate_on_write";
     public static final String KW_COMPACTION_STRATEGY_CLASS = "compaction_strategy_class";
     public static final String KW_CACHING = "caching";
     public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
@@ -90,7 +89,6 @@ public class CFPropDefs {
         keywords.add(KW_DEFAULTVALIDATION);
         keywords.add(KW_MINCOMPACTIONTHRESHOLD);
         keywords.add(KW_MAXCOMPACTIONTHRESHOLD);
-        keywords.add(KW_REPLICATEONWRITE);
         keywords.add(KW_COMPACTION_STRATEGY_CLASS);
         keywords.add(KW_CACHING);
         keywords.add(KW_DEFAULT_TIME_TO_LIVE);
@@ -107,6 +105,7 @@ public class CFPropDefs {
         obsoleteKeywords.add("memtable_operations_in_millions");
         obsoleteKeywords.add("memtable_flush_after_mins");
         obsoleteKeywords.add("row_cache_provider");
+        obsoleteKeywords.add("replicate_on_write");
 
         allowedKeywords.addAll(keywords);
         allowedKeywords.addAll(obsoleteKeywords);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index a71707c..7b5cbaf 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -188,7 +188,6 @@ public class CreateColumnFamilyStatement
                    .comment(cfProps.getProperty(CFPropDefs.KW_COMMENT))
                    .readRepairChance(getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE))
                    .dcLocalReadRepairChance(getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE))
-                   .replicateOnWrite(getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE))
                    .gcGraceSeconds(getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
                    .defaultValidator(cfProps.getValidator())
                    .minCompactionThreshold(minCompactionThreshold)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 25f61fb..e500359 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -25,6 +25,7 @@ import org.github.jamm.MemoryMeter;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -174,7 +175,11 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
     {
         for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
-            mutation.apply();
+        {
+            // We don't use counters internally.
+            assert mutation instanceof Mutation;
+            ((Mutation) mutation).apply();
+        }
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
index 6ce6406..3929f3c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
@@ -34,7 +34,6 @@ public class CFPropDefs extends PropertyDefinitions
     public static final String KW_GCGRACESECONDS = "gc_grace_seconds";
     public static final String KW_MINCOMPACTIONTHRESHOLD = "min_threshold";
     public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_threshold";
-    public static final String KW_REPLICATEONWRITE = "replicate_on_write";
     public static final String KW_CACHING = "caching";
     public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
     public static final String KW_INDEX_INTERVAL = "index_interval";
@@ -57,7 +56,6 @@ public class CFPropDefs extends PropertyDefinitions
         keywords.add(KW_READREPAIRCHANCE);
         keywords.add(KW_DCLOCALREADREPAIRCHANCE);
         keywords.add(KW_GCGRACESECONDS);
-        keywords.add(KW_REPLICATEONWRITE);
         keywords.add(KW_CACHING);
         keywords.add(KW_DEFAULT_TIME_TO_LIVE);
         keywords.add(KW_INDEX_INTERVAL);
@@ -67,6 +65,8 @@ public class CFPropDefs extends PropertyDefinitions
         keywords.add(KW_COMPACTION);
         keywords.add(KW_COMPRESSION);
         keywords.add(KW_MEMTABLE_FLUSH_PERIOD);
+
+        obsoleteKeywords.add("replicate_on_write");
     }
 
     private Class<? extends AbstractCompactionStrategy> compactionStrategyClass = null;
@@ -146,7 +146,6 @@ public class CFPropDefs extends PropertyDefinitions
         cfm.readRepairChance(getDouble(KW_READREPAIRCHANCE, cfm.getReadRepairChance()));
         cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair()));
         cfm.gcGraceSeconds(getInt(KW_GCGRACESECONDS, cfm.getGcGraceSeconds()));
-        cfm.replicateOnWrite(getBoolean(KW_REPLICATEONWRITE, cfm.getReplicateOnWrite()));
         int minCompactionThreshold = toInt(KW_MINCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MINCOMPACTIONTHRESHOLD), cfm.getMinCompactionThreshold());
         int maxCompactionThreshold = toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold());
         if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index c2a0080..ecbf4e1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -479,7 +479,11 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             throw new UnsupportedOperationException();
 
         for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false))
-            mutation.apply();
+        {
+            // We don't use counters internally.
+            assert mutation instanceof Mutation;
+            ((Mutation) mutation).apply();
+        }
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Cell.java b/src/java/org/apache/cassandra/db/Cell.java
index 3e04f9b..d3cf085 100644
--- a/src/java/org/apache/cassandra/db/Cell.java
+++ b/src/java/org/apache/cassandra/db/Cell.java
@@ -234,17 +234,13 @@ public class Cell implements OnDiskAtom
     {
         if (this == o)
             return true;
+
         if (o == null || getClass() != o.getClass())
             return false;
 
         Cell cell = (Cell)o;
 
-        if (timestamp != cell.timestamp)
-            return false;
-        if (!name.equals(cell.name))
-            return false;
-
-        return value.equals(cell.value);
+        return timestamp == cell.timestamp && name.equals(cell.name) && value.equals(cell.value);
     }
 
     @Override
@@ -256,11 +252,6 @@ public class Cell implements OnDiskAtom
         return result;
     }
 
-    public Cell localCopy(ColumnFamilyStore cfs)
-    {
-        return localCopy(cfs, HeapAllocator.instance);
-    }
-
     public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new Cell(name.copy(allocator), allocator.clone(value), timestamp);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ClockAndCount.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClockAndCount.java b/src/java/org/apache/cassandra/db/ClockAndCount.java
new file mode 100644
index 0000000..1678c8c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ClockAndCount.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.cache.IMeasurableMemory;
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class ClockAndCount implements IMeasurableMemory
+{
+    public static ClockAndCount BLANK = ClockAndCount.create(0L, 0L);
+
+    public final long clock;
+    public final long count;
+
+    private ClockAndCount(long clock, long count)
+    {
+        this.clock = clock;
+        this.count = count;
+    }
+
+    public static ClockAndCount create(long clock, long count)
+    {
+        return new ClockAndCount(clock, count);
+    }
+
+    public long memorySize()
+    {
+        return ObjectSizes.getFieldSize(TypeSizes.NATIVE.sizeof(clock))
+             + ObjectSizes.getFieldSize(TypeSizes.NATIVE.sizeof(count));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof ClockAndCount))
+            return false;
+
+        ClockAndCount other = (ClockAndCount) o;
+        return clock == other.clock && count == other.count;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(clock, count);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("ClockAndCount(%s,%s)", clock, count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index fe77d09..2237214 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 import java.util.regex.Pattern;
 import javax.management.*;
 
@@ -32,13 +33,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Striped;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.IRowCacheEntry;
-import org.apache.cassandra.cache.RowCacheKey;
-import org.apache.cassandra.cache.RowCacheSentinel;
+import org.apache.cassandra.cache.*;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
@@ -46,6 +46,7 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.composites.Composite;
@@ -105,6 +106,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public final ColumnFamilyMetrics metric;
     public volatile long sampleLatencyNanos;
 
+    private final Striped<Lock> counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 128);
+
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
@@ -332,9 +335,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         data.unreferenceSSTables();
         indexManager.invalidate();
 
-        for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
-            if (key.cfId == metadata.cfId)
-                invalidateCachedRow(key);
+        invalidateCaches();
+    }
+
+    /**
+     * Obtain a lock for this CF's part of a counter mutation
+     * @param key the key for the CounterMutation
+     * @return the striped lock instance
+     */
+    public Lock counterLockFor(ByteBuffer key)
+    {
+        assert metadata.isCounter();
+        return counterLocks.get(key);
     }
 
     /**
@@ -562,13 +574,29 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this);
         if (cachedRowsRead > 0)
-            logger.info("completed loading ({} ms; {} keys) row cache for {}.{}",
+            logger.info("Completed loading ({} ms; {} keys) row cache for {}.{}",
                         TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
                         cachedRowsRead,
                         keyspace.getName(),
                         name);
     }
 
+    public void initCounterCache()
+    {
+        if (!metadata.isCounter() || CacheService.instance.counterCache.getCapacity() == 0)
+            return;
+
+        long start = System.nanoTime();
+
+        int cachedShardsRead = CacheService.instance.counterCache.loadSaved(this);
+        if (cachedShardsRead > 0)
+            logger.info("Completed loading ({} ms; {} shards) counter cache for {}.{}",
+                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
+                        cachedShardsRead,
+                        keyspace.getName(),
+                        name);
+    }
+
     /**
      * See #{@code StorageService.loadNewSSTables(String, String)} for more info
      *
@@ -1073,9 +1101,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return maxFile;
     }
 
-    public void forceCleanup(CounterId.OneShotRenewer renewer) throws ExecutionException, InterruptedException
+    public void forceCleanup() throws ExecutionException, InterruptedException
     {
-        CompactionManager.instance.performCleanup(ColumnFamilyStore.this, renewer);
+        CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
     }
 
     public void scrub(boolean disableSnapshot) throws ExecutionException, InterruptedException
@@ -1535,6 +1563,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
                 invalidateCachedRow(dk);
         }
+
+        if (metadata.isCounter())
+        {
+            for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
+            {
+                DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
+                if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
+                    CacheService.instance.counterCache.remove(key);
+            }
+        }
     }
 
     public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row>
@@ -1886,6 +1924,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily) cached;
     }
 
+    private void invalidateCaches()
+    {
+        for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+            if (key.cfId == metadata.cfId)
+                invalidateCachedRow(key);
+
+        if (metadata.isCounter())
+            for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
+                if (key.cfId == metadata.cfId)
+                    CacheService.instance.counterCache.remove(key);
+    }
+
     /**
      * @return true if @param key is contained in the row cache
      */
@@ -1908,6 +1958,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         invalidateCachedRow(new RowCacheKey(cfId, key));
     }
 
+    public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName)
+    {
+        if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
+            return null;
+        return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, cellName));
+    }
+
+    public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount)
+    {
+        if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
+            return;
+        CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, cellName), clockAndCount);
+    }
+
     public void forceMajorCompaction() throws InterruptedException, ExecutionException
     {
         CompactionManager.instance.performMaximal(this);
@@ -2017,12 +2081,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                 SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
 
-                logger.debug("cleaning out row cache");
-                for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
-                {
-                    if (key.cfId == metadata.cfId)
-                        invalidateCachedRow(key);
-                }
+                invalidateCaches();
             }
         };
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 6d04314..c97e71f 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -341,17 +341,10 @@ public enum ConsistencyLevel
     public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException
     {
         if (this == ConsistencyLevel.ANY)
-        {
             throw new InvalidRequestException("Consistency level ANY is not yet supported for counter columnfamily " + metadata.cfName);
-        }
-        else if (!metadata.getReplicateOnWrite() && !(this == ConsistencyLevel.ONE || this == ConsistencyLevel.LOCAL_ONE))
-        {
-            throw new InvalidRequestException("cannot achieve CL > CL.ONE without replicate_on_write on columnfamily " + metadata.cfName);
-        }
-        else if (isSerialConsistency())
-        {
+
+        if (isSerialConsistency())
             throw new InvalidRequestException("Counter operations are inherently non-serializable");
-        }
     }
 
     private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
index 76949d4..426d876 100644
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.context.IContext.ContextRelationship;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Allocator;
@@ -40,16 +39,6 @@ public class CounterCell extends Cell
 
     private final long timestampOfLastDelete;
 
-    public CounterCell(CellName name, long value, long timestamp)
-    {
-        this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp);
-    }
-
-    public CounterCell(CellName name, long value, long timestamp, long timestampOfLastDelete)
-    {
-        this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
-    }
-
     public CounterCell(CellName name, ByteBuffer value, long timestamp)
     {
         this(name, value, timestamp, Long.MIN_VALUE);
@@ -68,6 +57,12 @@ public class CounterCell extends Cell
         return new CounterCell(name, value, timestamp, timestampOfLastDelete);
     }
 
+    // For use by tests of compatibility with pre-2.1 counter only.
+    public static CounterCell createLocal(CellName name, long value, long timestamp, long timestampOfLastDelete)
+    {
+        return new CounterCell(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
+    }
+
     @Override
     public Cell withUpdatedName(CellName newName)
     {
@@ -110,12 +105,12 @@ public class CounterCell extends Cell
         // merging a CounterCell with a tombstone never return a tombstone
         // unless that tombstone timestamp is greater that the CounterCell
         // one.
-        assert !(cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
+        assert cell instanceof CounterCell : "Wrong class type: " + cell.getClass();
 
         if (timestampOfLastDelete() < ((CounterCell) cell).timestampOfLastDelete())
             return cell;
-        ContextRelationship rel = contextManager.diff(cell.value(), value());
-        if (ContextRelationship.GREATER_THAN == rel || ContextRelationship.DISJOINT == rel)
+        CounterContext.Relationship rel = contextManager.diff(cell.value(), value());
+        if (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT)
             return cell;
         return null;
     }
@@ -195,12 +190,6 @@ public class CounterCell extends Cell
     }
 
     @Override
-    public Cell localCopy(ColumnFamilyStore cfs)
-    {
-        return localCopy(cfs, HeapAllocator.instance);
-    }
-
-    @Override
     public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new CounterCell(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
@@ -231,14 +220,6 @@ public class CounterCell extends Cell
         contextManager.validateContext(value());
     }
 
-    /**
-     * Check if a given counterId is found in this CounterCell context.
-     */
-    public boolean hasCounterId(CounterId id)
-    {
-        return contextManager.hasCounterId(value(), id);
-    }
-
     public Cell markLocalToBeCleared()
     {
         ByteBuffer marked = contextManager.markLocalToBeCleared(value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 7dcb05c..6884c80 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -21,22 +21,21 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import com.google.common.collect.Iterables;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.*;
 
 public class CounterMutation implements IMutation
 {
@@ -76,67 +75,175 @@ public class CounterMutation implements IMutation
         return consistency;
     }
 
-    public Mutation makeReplicationMutation()
+    public MessageOut<CounterMutation> makeMutationMessage()
+    {
+        return new MessageOut<>(MessagingService.Verb.COUNTER_MUTATION, this, serializer);
+    }
+
+    /**
+     * Applies the counter mutation, returns the result Mutation (for replication to other nodes).
+     *
+     * 1. Grabs the striped CF-level lock(s)
+     * 2. Gets the current values of the counters-to-be-modified from the counter cache
+     * 3. Reads the rest of the current values (cache misses) from the CF
+     * 4. Writes the updated counter values
+     * 5. Updates the counter cache
+     * 6. Releases the lock(s)
+     *
+     * See CASSANDRA-4775 and CASSANDRA-6504 for further details.
+     *
+     * @return the applied resulting Mutation
+     */
+    public Mutation apply() throws WriteTimeoutException
     {
-        List<ReadCommand> readCommands = new LinkedList<>();
-        long timestamp = System.currentTimeMillis();
-        for (ColumnFamily columnFamily : mutation.getColumnFamilies())
+        Mutation result = new Mutation(getKeyspaceName(), ByteBufferUtil.clone(key()));
+        Keyspace keyspace = Keyspace.open(getKeyspaceName());
+
+        ArrayList<UUID> cfIds = new ArrayList<>(getColumnFamilyIds());
+        Collections.sort(cfIds); // will lock in the sorted order, to avoid a potential deadlock.
+        ArrayList<Lock> locks = new ArrayList<>(cfIds.size());
+        try
         {
-            if (!columnFamily.metadata().getReplicateOnWrite())
-                continue;
-            addReadCommandFromColumnFamily(mutation.getKeyspaceName(), mutation.key(), columnFamily, timestamp, readCommands);
+            Tracing.trace("Acquiring {} counter locks", cfIds.size());
+            for (UUID cfId : cfIds)
+            {
+                Lock lock = keyspace.getColumnFamilyStore(cfId).counterLockFor(key());
+                if (!lock.tryLock(getTimeout(), TimeUnit.MILLISECONDS))
+                    throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
+                locks.add(lock);
+            }
+
+            for (ColumnFamily cf : getColumnFamilies())
+                result.add(processModifications(cf));
+
+            result.apply();
+            updateCounterCache(result, keyspace);
+            return result;
+        }
+        catch (InterruptedException e)
+        {
+            throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
+        }
+        finally
+        {
+            for (Lock lock : locks)
+                lock.unlock();
         }
+    }
+
+    // Replaces all the CounterUpdateCell-s with updated regular CounterCell-s
+    private ColumnFamily processModifications(ColumnFamily changesCF)
+    {
+        Allocator allocator = HeapAllocator.instance;
+        ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id());
 
-        // create a replication Mutation
-        Mutation replicationMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
-        for (ReadCommand readCommand : readCommands)
+        ColumnFamily resultCF = changesCF.cloneMeShallow();
+
+        List<CounterUpdateCell> counterUpdateCells = new ArrayList<>(changesCF.getColumnCount());
+        for (Cell cell : changesCF)
         {
-            Keyspace keyspace = Keyspace.open(readCommand.ksName);
-            Row row = readCommand.getRow(keyspace);
-            if (row == null || row.cf == null)
-                continue;
+            if (cell instanceof CounterUpdateCell)
+                counterUpdateCells.add((CounterUpdateCell)cell);
+            else
+                resultCF.addColumn(cell.localCopy(cfs, allocator));
+        }
 
-            ColumnFamily cf = row.cf;
-            replicationMutation.add(cf);
+        if (counterUpdateCells.isEmpty())
+            return resultCF; // only DELETEs
+
+        ClockAndCount[] currentValues = getCurrentValues(counterUpdateCells, cfs);
+        for (int i = 0; i < counterUpdateCells.size(); i++)
+        {
+            ClockAndCount currentValue = currentValues[i];
+            CounterUpdateCell update = counterUpdateCells.get(i);
+
+            long clock = currentValue.clock + 1L;
+            long count = currentValue.count + update.delta();
+
+            resultCF.addColumn(new CounterCell(update.name().copy(allocator),
+                                               CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count, allocator),
+                                               update.timestamp()));
         }
-        return replicationMutation;
+
+        return resultCF;
     }
 
-    private void addReadCommandFromColumnFamily(String keyspaceName, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
+    // Attempt to load the current values(s) from cache. If that fails, read the rest from the cfs.
+    private ClockAndCount[] getCurrentValues(List<CounterUpdateCell> counterUpdateCells, ColumnFamilyStore cfs)
     {
-        SortedSet<CellName> s = new TreeSet<>(columnFamily.metadata().comparator);
-        Iterables.addAll(s, columnFamily.getColumnNames());
-        commands.add(new SliceByNamesReadCommand(keyspaceName, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
+        ClockAndCount[] currentValues = new ClockAndCount[counterUpdateCells.size()];
+        int remaining = counterUpdateCells.size();
+
+        if (CacheService.instance.counterCache.getCapacity() != 0)
+        {
+            Tracing.trace("Fetching {} counter values from cache", counterUpdateCells.size());
+            remaining = getCurrentValuesFromCache(counterUpdateCells, cfs, currentValues);
+            if (remaining == 0)
+                return currentValues;
+        }
+
+        Tracing.trace("Reading {} counter values from the CF", remaining);
+        getCurrentValuesFromCFS(counterUpdateCells, cfs, currentValues);
+
+        return currentValues;
     }
 
-    public MessageOut<CounterMutation> makeMutationMessage()
+    // Returns the count of cache misses.
+    private int getCurrentValuesFromCache(List<CounterUpdateCell> counterUpdateCells,
+                                          ColumnFamilyStore cfs,
+                                          ClockAndCount[] currentValues)
     {
-        return new MessageOut<>(MessagingService.Verb.COUNTER_MUTATION, this, serializer);
+        int cacheMisses = 0;
+        for (int i = 0; i < counterUpdateCells.size(); i++)
+        {
+            ClockAndCount cached = cfs.getCachedCounter(key(), counterUpdateCells.get(i).name());
+            if (cached != null)
+                currentValues[i] = cached;
+            else
+                cacheMisses++;
+        }
+        return cacheMisses;
     }
 
-    public boolean shouldReplicateOnWrite()
+    // Reads the missing current values from the CFS.
+    private void getCurrentValuesFromCFS(List<CounterUpdateCell> counterUpdateCells,
+                                         ColumnFamilyStore cfs,
+                                         ClockAndCount[] currentValues)
     {
-        for (ColumnFamily cf : mutation.getColumnFamilies())
-            if (cf.metadata().getReplicateOnWrite())
-                return true;
-        return false;
+        SortedSet<CellName> names = new TreeSet<>(cfs.metadata.comparator);
+        for (int i = 0; i < currentValues.length; i++)
+            if (currentValues[i] == null)
+                names.add(counterUpdateCells.get(i).name);
+
+        ReadCommand cmd = new SliceByNamesReadCommand(getKeyspaceName(), key(), cfs.metadata.cfName, Long.MIN_VALUE, new NamesQueryFilter(names));
+        Row row = cmd.getRow(cfs.keyspace);
+        ColumnFamily cf = row == null ? null : row.cf;
+
+        for (int i = 0; i < currentValues.length; i++)
+        {
+            if (currentValues[i] != null)
+                continue;
+
+            Cell cell = cf == null ? null : cf.getColumn(counterUpdateCells.get(i).name());
+            if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE)) // absent or a tombstone.
+                currentValues[i] = ClockAndCount.BLANK;
+            else
+                currentValues[i] = CounterContext.instance().getLocalClockAndCount(cell.value());
+        }
     }
 
-    public void apply()
+    private void updateCounterCache(Mutation applied, Keyspace keyspace)
     {
-        // transform all CounterUpdateCell to CounterCell: accomplished by localCopy
-        Mutation m = new Mutation(mutation.getKeyspaceName(), ByteBufferUtil.clone(mutation.key()));
-        Keyspace keyspace = Keyspace.open(m.getKeyspaceName());
+        if (CacheService.instance.counterCache.getCapacity() == 0)
+            return;
 
-        for (ColumnFamily cf_ : mutation.getColumnFamilies())
+        for (ColumnFamily cf : applied.getColumnFamilies())
         {
-            ColumnFamily cf = cf_.cloneMeShallow();
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
-            for (Cell cell : cf_)
-                cf.addColumn(cell.localCopy(cfs), HeapAllocator.instance);
-            m.add(cf);
+            for (Cell cell : cf)
+                if (cell instanceof CounterCell)
+                    cfs.putCachedCounter(key(), cell.name(), CounterContext.instance().getLocalClockAndCount(cell.value()));
         }
-        m.apply();
     }
 
     public void addAll(IMutation m)
@@ -147,6 +254,11 @@ public class CounterMutation implements IMutation
         mutation.addAll(cm.mutation);
     }
 
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getCounterWriteRpcTimeout();
+    }
+
     @Override
     public String toString()
     {
@@ -176,7 +288,7 @@ public class CounterMutation implements IMutation
         public long serializedSize(CounterMutation cm, int version)
         {
             return Mutation.serializer.serializedSize(cm.mutation, version)
-                    + TypeSizes.NATIVE.sizeof(cm.consistency.name());
+                 + TypeSizes.NATIVE.sizeof(cm.consistency.name());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
index dd2bf2a..f7a0ef1 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
@@ -21,10 +21,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
 
 /**
  * A counter update while it hasn't been applied yet by the leader replica.
@@ -63,13 +61,12 @@ public class CounterUpdateCell extends Cell
         // The only time this could happen is if a batchAdd ships two
         // increment for the same cell. Hence we simply sums the delta.
 
-        assert (cell instanceof CounterUpdateCell) || (cell instanceof DeletedCell) : "Wrong class type.";
-
         // tombstones take precedence
         if (cell.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired cell, so the current time is irrelevant
             return timestamp() > cell.timestamp() ? this : cell;
 
         // neither is tombstoned
+        assert cell instanceof CounterUpdateCell : "Wrong class type.";
         CounterUpdateCell c = (CounterUpdateCell) cell;
         return new CounterUpdateCell(name(), delta() + c.delta(), Math.max(timestamp(), c.timestamp()));
     }
@@ -81,21 +78,9 @@ public class CounterUpdateCell extends Cell
     }
 
     @Override
-    public CounterCell localCopy(ColumnFamilyStore cfs)
-    {
-        return new CounterCell(name.copy(HeapAllocator.instance),
-                               CounterContext.instance().createLocal(delta(), HeapAllocator.instance),
-                               timestamp(),
-                               Long.MIN_VALUE);
-    }
-
-    @Override
     public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
-        return new CounterCell(name.copy(allocator),
-                               CounterContext.instance().createLocal(delta(), allocator),
-                               timestamp(),
-                               Long.MIN_VALUE);
+        throw new UnsupportedOperationException();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/DeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java
index 5b89e1d..13d1358 100644
--- a/src/java/org/apache/cassandra/db/DeletedCell.java
+++ b/src/java/org/apache/cassandra/db/DeletedCell.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
 
 public class DeletedCell extends Cell
 {
@@ -98,12 +97,6 @@ public class DeletedCell extends Cell
     }
 
     @Override
-    public Cell localCopy(ColumnFamilyStore cfs)
-    {
-        return new DeletedCell(name.copy(HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
-    }
-
-    @Override
     public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new DeletedCell(name.copy(allocator), allocator.clone(value), timestamp);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
index b15514e..a2f68da 100644
--- a/src/java/org/apache/cassandra/db/ExpiringCell.java
+++ b/src/java/org/apache/cassandra/db/ExpiringCell.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.HeapAllocator;
 
 /**
  * Alternative to Cell that have an expiring time.
@@ -133,12 +132,6 @@ public class ExpiringCell extends Cell
     }
 
     @Override
-    public Cell localCopy(ColumnFamilyStore cfs)
-    {
-        return localCopy(cfs, HeapAllocator.instance);
-    }
-
-    @Override
     public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new ExpiringCell(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 70bd79c..44df104 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -26,7 +26,7 @@ public interface IMutation
     public String getKeyspaceName();
     public Collection<UUID> getColumnFamilyIds();
     public ByteBuffer key();
-    public void apply();
+    public long getTimeout();
     public String toString(boolean shallow);
     public void addAll(IMutation m);
     public Collection<ColumnFamily> getColumnFamilies();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index d70d7f9..31d9503 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -26,6 +26,7 @@ import java.util.*;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
@@ -219,6 +220,11 @@ public class Mutation implements IMutation
         return new MessageOut<>(verb, this, serializer);
     }
 
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getWriteRpcTimeout();
+    }
+
     public String toString()
     {
         return toString(false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 39bdd15..f8637c1 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -693,27 +693,6 @@ public class SystemKeyspace
         forceBlockingFlush(COUNTER_ID_CF);
     }
 
-    public static List<CounterId.CounterIdRecord> getOldLocalCounterIds()
-    {
-        List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
-
-        Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
-        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
-        ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
-
-        CounterId previous = null;
-        for (Cell c : cf)
-        {
-            if (previous != null)
-                l.add(new CounterId.CounterIdRecord(previous, c.timestamp()));
-
-            // this will ignore the last column on purpose since it is the
-            // current local node id
-            previous = CounterId.wrap(c.name().toByteBuffer());
-        }
-        return l;
-    }
-
     /**
      * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
      * @return CFS responsible to hold low-level serialized schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2a8d68d..1414c3f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -261,7 +261,7 @@ public class CompactionManager implements CompactionManagerMBean
         });
     }
 
-    public void performCleanup(ColumnFamilyStore cfStore, final CounterId.OneShotRenewer renewer) throws InterruptedException, ExecutionException
+    public void performCleanup(ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
     {
         performAllSSTableOperation(cfStore, new AllSSTablesOperation()
         {
@@ -272,7 +272,7 @@ public class CompactionManager implements CompactionManagerMBean
                 List<SSTableReader> sortedSSTables = Lists.newArrayList(sstables);
                 Collections.sort(sortedSSTables, new SSTableReader.SizeComparator());
 
-                doCleanupCompaction(store, sortedSSTables, renewer);
+                doCleanupCompaction(store, sortedSSTables);
             }
         });
     }
@@ -508,7 +508,7 @@ public class CompactionManager implements CompactionManagerMBean
      *
      * @throws IOException
      */
-    private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables, CounterId.OneShotRenewer renewer) throws IOException
+    private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables) throws IOException
     {
         assert !cfs.isIndex();
         Keyspace keyspace = cfs.keyspace;
@@ -520,7 +520,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         boolean hasIndexes = cfs.indexManager.hasIndexes();
-        CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, renewer);
+        CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges);
 
         for (SSTableReader sstable : sstables)
         {
@@ -614,12 +614,11 @@ public class CompactionManager implements CompactionManagerMBean
 
     private static abstract class CleanupStrategy
     {
-        public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
+        public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
         {
-            if (cfs.indexManager.hasIndexes() || cfs.metadata.isCounter())
-                return new Full(cfs, ranges, renewer);
-
-            return new Bounded(cfs, ranges);
+            return cfs.indexManager.hasIndexes()
+                 ? new Full(cfs, ranges)
+                 : new Bounded(cfs, ranges);
         }
 
         public abstract ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter);
@@ -660,14 +659,12 @@ public class CompactionManager implements CompactionManagerMBean
             private final Collection<Range<Token>> ranges;
             private final ColumnFamilyStore cfs;
             private List<Cell> indexedColumnsInRow;
-            private final CounterId.OneShotRenewer renewer;
 
-            public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
+            public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
             {
                 this.cfs = cfs;
                 this.ranges = ranges;
                 this.indexedColumnsInRow = null;
-                this.renewer = renewer;
             }
 
             @Override
@@ -690,8 +687,6 @@ public class CompactionManager implements CompactionManagerMBean
                 while (row.hasNext())
                 {
                     OnDiskAtom column = row.next();
-                    if (column instanceof CounterCell)
-                        renewer.maybeRenew((CounterCell) column);
 
                     if (column instanceof Cell && cfs.indexManager.indexes((Cell) column))
                     {


[2/3] New counters implementation

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/context/CounterContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index 1fa4d60..d759f6d 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.ClockAndCount;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.serializers.MarshalException;
@@ -72,7 +73,7 @@ import org.apache.cassandra.utils.*;
  * rules work this way, see CASSANDRA-1938 - specifically the 1938_discussion
  * attachment (doesn't cover global shards, see CASSANDRA-4775 for that).
  */
-public class CounterContext implements IContext
+public class CounterContext
 {
     private static final int HEADER_SIZE_LENGTH = TypeSizes.NATIVE.sizeof(Short.MAX_VALUE);
     private static final int HEADER_ELT_LENGTH = TypeSizes.NATIVE.sizeof(Short.MAX_VALUE);
@@ -82,6 +83,11 @@ public class CounterContext implements IContext
 
     private static final Logger logger = LoggerFactory.getLogger(CounterContext.class);
 
+    public static enum Relationship
+    {
+        EQUAL, GREATER_THAN, LESS_THAN, DISJOINT
+    }
+
     // lazy-load singleton
     private static class LazyHolder
     {
@@ -94,7 +100,18 @@ public class CounterContext implements IContext
     }
 
     /**
+     * Creates a counter context with a single global, 2.1+ shard (a result of increment).
+     */
+    public ByteBuffer createGlobal(CounterId id, long clock, long count, Allocator allocator)
+    {
+        ContextState state = ContextState.allocate(1, 0, 0, allocator);
+        state.writeGlobal(id, clock, count);
+        return state.context;
+    }
+
+    /**
      * Creates a counter context with a single local shard.
+     * For use by tests of compatibility with pre-2.1 counters only.
      */
     public ByteBuffer createLocal(long count, Allocator allocator)
     {
@@ -105,6 +122,7 @@ public class CounterContext implements IContext
 
     /**
      * Creates a counter context with a single remote shard.
+     * For use by tests of compatibility with pre-2.1 counters only.
      */
     public ByteBuffer createRemote(CounterId id, long clock, long count, Allocator allocator)
     {
@@ -135,11 +153,11 @@ public class CounterContext implements IContext
      *
      * @param left counter context.
      * @param right counter context.
-     * @return the ContextRelationship between the contexts.
+     * @return the Relationship between the contexts.
      */
-    public ContextRelationship diff(ByteBuffer left, ByteBuffer right)
+    public Relationship diff(ByteBuffer left, ByteBuffer right)
     {
-        ContextRelationship relationship = ContextRelationship.EQUAL;
+        Relationship relationship = Relationship.EQUAL;
         ContextState leftState = ContextState.wrap(left);
         ContextState rightState = ContextState.wrap(right);
 
@@ -165,45 +183,25 @@ public class CounterContext implements IContext
                     {
                         // Inconsistent shard (see the corresponding code in merge()). We return DISJOINT in this
                         // case so that it will be treated as a difference, allowing read-repair to work.
-                        return ContextRelationship.DISJOINT;
-                    }
-                    else
-                    {
-                        continue;
+                        return Relationship.DISJOINT;
                     }
                 }
                 else if ((leftClock >= 0 && rightClock > 0 && leftClock > rightClock)
                       || (leftClock < 0 && (rightClock > 0 || leftClock < rightClock)))
                 {
-                    if (relationship == ContextRelationship.EQUAL)
-                    {
-                        relationship = ContextRelationship.GREATER_THAN;
-                    }
-                    else if (relationship == ContextRelationship.GREATER_THAN)
-                    {
-                        continue;
-                    }
-                    else
-                    {
-                        // relationship == ContextRelationship.LESS_THAN
-                        return ContextRelationship.DISJOINT;
-                    }
+                    if (relationship == Relationship.EQUAL)
+                        relationship = Relationship.GREATER_THAN;
+                    else if (relationship == Relationship.LESS_THAN)
+                        return Relationship.DISJOINT;
+                    // relationship == Relationship.GREATER_THAN
                 }
                 else
                 {
-                    if (relationship == ContextRelationship.EQUAL)
-                    {
-                        relationship = ContextRelationship.LESS_THAN;
-                    }
-                    else if (relationship == ContextRelationship.GREATER_THAN)
-                    {
-                        return ContextRelationship.DISJOINT;
-                    }
-                    else
-                    {
-                        // relationship == ContextRelationship.LESS_THAN
-                        continue;
-                    }
+                    if (relationship == Relationship.EQUAL)
+                        relationship = Relationship.LESS_THAN;
+                    else if (relationship == Relationship.GREATER_THAN)
+                        return Relationship.DISJOINT;
+                    // relationship == Relationship.LESS_THAN
                 }
             }
             else if (compareId > 0)
@@ -211,63 +209,40 @@ public class CounterContext implements IContext
                 // only advance the right context
                 rightState.moveToNext();
 
-                if (relationship == ContextRelationship.EQUAL)
-                {
-                    relationship = ContextRelationship.LESS_THAN;
-                }
-                else if (relationship == ContextRelationship.GREATER_THAN)
-                {
-                    return ContextRelationship.DISJOINT;
-                }
-                else
-                {
-                    // relationship == ContextRelationship.LESS_THAN
-                    continue;
-                }
+                if (relationship == Relationship.EQUAL)
+                    relationship = Relationship.LESS_THAN;
+                else if (relationship == Relationship.GREATER_THAN)
+                    return Relationship.DISJOINT;
+                // relationship == Relationship.LESS_THAN
             }
             else // compareId < 0
             {
                 // only advance the left context
                 leftState.moveToNext();
 
-                if (relationship == ContextRelationship.EQUAL)
-                {
-                    relationship = ContextRelationship.GREATER_THAN;
-                }
-                else if (relationship == ContextRelationship.GREATER_THAN)
-                {
-                    continue;
-                }
-                else
-                // relationship == ContextRelationship.LESS_THAN
-                {
-                    return ContextRelationship.DISJOINT;
-                }
+                if (relationship == Relationship.EQUAL)
+                    relationship = Relationship.GREATER_THAN;
+                else if (relationship == Relationship.LESS_THAN)
+                    return Relationship.DISJOINT;
+                // relationship == Relationship.GREATER_THAN
             }
         }
 
         // check final lengths
         if (leftState.hasRemaining())
         {
-            if (relationship == ContextRelationship.EQUAL)
-            {
-                return ContextRelationship.GREATER_THAN;
-            }
-            else if (relationship == ContextRelationship.LESS_THAN)
-            {
-                return ContextRelationship.DISJOINT;
-            }
+            if (relationship == Relationship.EQUAL)
+                return Relationship.GREATER_THAN;
+            else if (relationship == Relationship.LESS_THAN)
+                return Relationship.DISJOINT;
         }
-        else if (rightState.hasRemaining())
+
+        if (rightState.hasRemaining())
         {
-            if (relationship == ContextRelationship.EQUAL)
-            {
-                return ContextRelationship.LESS_THAN;
-            }
-            else if (relationship == ContextRelationship.GREATER_THAN)
-            {
-                return ContextRelationship.DISJOINT;
-            }
+            if (relationship == Relationship.EQUAL)
+                return Relationship.LESS_THAN;
+            else if (relationship == Relationship.GREATER_THAN)
+                return Relationship.DISJOINT;
         }
 
         return relationship;
@@ -527,14 +502,9 @@ public class CounterContext implements IContext
     public long total(ByteBuffer context)
     {
         long total = 0L;
-
         // we could use a ContextState but it is easy enough that we avoid the object creation
         for (int offset = context.position() + headerLength(context); offset < context.limit(); offset += STEP_LENGTH)
-        {
-            long count = context.getLong(offset + CounterId.LENGTH + CLOCK_LENGTH);
-            total += count;
-        }
-
+            total += context.getLong(offset + CounterId.LENGTH + CLOCK_LENGTH);
         return total;
     }
 
@@ -642,24 +612,54 @@ public class CounterContext implements IContext
     }
 
     /**
-     * Checks whether the provided context has a count for the provided
-     * CounterId.
-     *
-     * TODO: since the context is sorted, we could implement a binary search.
-     * This is however not called in any critical path and contexts will be
-     * fairly small so it doesn't matter much.
+     * Returns the clock and the count associated with the local counter id, or (0, 0) if no such shard is present.
      */
-    public boolean hasCounterId(ByteBuffer context, CounterId id)
+    public ClockAndCount getLocalClockAndCount(ByteBuffer context)
     {
-        // we could use a ContextState but it is easy enough that we avoid the object creation
-        for (int offset = context.position() + headerLength(context); offset < context.limit(); offset += STEP_LENGTH)
+        return getClockAndCountOf(context, CounterId.getLocalId());
+    }
+
+    /**
+     * Returns the clock and the count associated with the given counter id, or (0, 0) if no such shard is present.
+     */
+    @VisibleForTesting
+    public ClockAndCount getClockAndCountOf(ByteBuffer context, CounterId id)
+    {
+        int position = findPositionOf(context, id);
+        if (position == -1)
+            return ClockAndCount.BLANK;
+
+        long clock = context.getLong(position + CounterId.LENGTH);
+        long count = context.getLong(position + CounterId.LENGTH + CLOCK_LENGTH);
+        return ClockAndCount.create(clock, count);
+    }
+
+    /**
+     * Finds the position of a shard with the given id within the context (via binary search).
+     */
+    @VisibleForTesting
+    public int findPositionOf(ByteBuffer context, CounterId id)
+    {
+        int headerLength = headerLength(context);
+        int offset = context.position() + headerLength;
+
+        int left = 0;
+        int right = (context.remaining() - headerLength) / STEP_LENGTH - 1;
+
+        while (right >= left)
         {
-            if (id.equals(CounterId.wrap(context, offset)))
-            {
-                return true;
-            }
+            int middle = (left + right) / 2;
+            int cmp = compareId(context, offset + middle * STEP_LENGTH, id.bytes(), id.bytes().position());
+
+            if (cmp == -1)
+                left = middle + 1;
+            else if (cmp == 0)
+                return offset + middle * STEP_LENGTH;
+            else
+                right = middle - 1;
         }
-        return false;
+
+        return -1; // position not found
     }
 
     /**
@@ -754,20 +754,7 @@ public class CounterContext implements IContext
 
         public void copyTo(ContextState other)
         {
-            ByteBufferUtil.arrayCopy(context,
-                                     context.position() + bodyOffset,
-                                     other.context,
-                                     other.context.position() + other.bodyOffset,
-                                     STEP_LENGTH);
-
-            if (currentIsGlobal)
-                other.context.putShort(other.context.position() + other.headerOffset, (short) (other.getElementIndex() + Short.MIN_VALUE));
-            else if (currentIsLocal)
-                context.putShort(other.context.position() + other.headerOffset, (short) other.getElementIndex());
-
-            other.currentIsGlobal = currentIsGlobal;
-            other.currentIsLocal = currentIsLocal;
-            other.moveToNext();
+            other.writeElement(getCounterId(), getClock(), getCount(), currentIsGlobal, currentIsLocal);
         }
 
         public int compareIdTo(ContextState other)
@@ -802,17 +789,18 @@ public class CounterContext implements IContext
             return context.getLong(context.position() + bodyOffset + CounterId.LENGTH + CLOCK_LENGTH);
         }
 
-        // In 2.0 only used by the unit tests.
         public void writeGlobal(CounterId id, long clock, long count)
         {
             writeElement(id, clock, count, true, false);
         }
 
+        // In 2.1 only used by the unit tests.
         public void writeLocal(CounterId id, long clock, long count)
         {
             writeElement(id, clock, count, false, true);
         }
 
+        // In 2.1 only used by the unit tests.
         public void writeRemote(CounterId id, long clock, long count)
         {
             writeElement(id, clock, count, false, false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/context/IContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/IContext.java b/src/java/org/apache/cassandra/db/context/IContext.java
deleted file mode 100644
index ab10f55..0000000
--- a/src/java/org/apache/cassandra/db/context/IContext.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.context;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.Allocator;
-
-/**
- * An opaque commutative context.
- *
- * Maintains a ByteBuffer context that represents a partitioned commutative value.
- */
-public interface IContext
-{
-    public static enum ContextRelationship
-    {
-        EQUAL,
-        GREATER_THAN,
-        LESS_THAN,
-        DISJOINT
-    };
-
-    /**
-     * Determine the relationship between two contexts.
-     *
-     * EQUAL:        Equal set of nodes and every count is equal.
-     * GREATER_THAN: Superset of nodes and every count is equal or greater than its corollary.
-     * LESS_THAN:    Subset of nodes and every count is equal or less than its corollary.
-     * DISJOINT:     Node sets are not equal and/or counts are not all greater or less than.
-     *
-     * @param left
-     *            context.
-     * @param right
-     *            context.
-     * @return the ContextRelationship between the contexts.
-     */
-    public ContextRelationship diff(ByteBuffer left, ByteBuffer right);
-
-    /**
-     * Return a context w/ an aggregated count for each node id.
-     *
-     * @param left
-     *            context.
-     * @param right
-     *            context.
-     * @param allocator
-     *            an allocator to allocate the new context from.
-     */
-    public ByteBuffer merge(ByteBuffer left, ByteBuffer right, Allocator allocator);
-
-    /**
-     * Human-readable String from context.
-     *
-     * @param context
-     *            context.
-     * @return a human-readable String of the context.
-     */
-    public String toString(ByteBuffer context);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 9e17fd7..587d932 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -156,7 +156,7 @@ public abstract class AbstractSSTableSimpleWriter
     public void addCounterColumn(ByteBuffer name, long value)
     {
         addColumn(new CounterCell(metadata.comparator.cellFromByteBuffer(name),
-                                  CounterContext.instance().createRemote(counterid, 1L, value, HeapAllocator.instance),
+                                  CounterContext.instance().createGlobal(counterid, 1L, value, HeapAllocator.instance),
                                   System.currentTimeMillis()));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index f4043c1..432fe5e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -130,9 +130,9 @@ public final class MessagingService implements MessagingServiceMBean
     public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class)
     {{
         put(Verb.MUTATION, Stage.MUTATION);
+        put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
         put(Verb.READ_REPAIR, Stage.MUTATION);
         put(Verb.TRUNCATE, Stage.MUTATION);
-        put(Verb.COUNTER_MUTATION, Stage.MUTATION);
         put(Verb.PAXOS_PREPARE, Stage.MUTATION);
         put(Verb.PAXOS_PROPOSE, Stage.MUTATION);
         put(Verb.PAXOS_COMMIT, Stage.MUTATION);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 295ed51..c75aac2 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -44,7 +44,6 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
     private final WriteType writeType;
 
     /**
-     * @param pendingEndpoints
      * @param callback A callback to be called when the write is successful.
      */
     protected AbstractWriteResponseHandler(Keyspace keyspace,
@@ -65,7 +64,11 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
 
     public void get() throws WriteTimeoutException
     {
-        long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getWriteRpcTimeout()) - (System.nanoTime() - start);
+        long requestTimeout = writeType == WriteType.COUNTER
+                            ? DatabaseDescriptor.getCounterWriteRpcTimeout()
+                            : DatabaseDescriptor.getWriteRpcTimeout();
+
+        long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - start);
 
         boolean success;
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 59fb59e..4a2fc18 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -28,9 +28,11 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,14 +44,14 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -62,7 +64,8 @@ public class CacheService implements CacheServiceMBean
     public static enum CacheType
     {
         KEY_CACHE("KeyCache"),
-        ROW_CACHE("RowCache");
+        ROW_CACHE("RowCache"),
+        COUNTER_CACHE("CounterCache");
 
         private final String name;
 
@@ -81,6 +84,7 @@ public class CacheService implements CacheServiceMBean
 
     public final AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache;
     public final AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache;
+    public final AutoSavingCache<CounterCacheKey, ClockAndCount> counterCache;
 
     private CacheService()
     {
@@ -97,6 +101,7 @@ public class CacheService implements CacheServiceMBean
 
         keyCache = initKeyCache();
         rowCache = initRowCache();
+        counterCache = initCounterCache();
     }
 
     /**
@@ -112,7 +117,7 @@ public class CacheService implements CacheServiceMBean
         // where 48 = 40 bytes (average size of the key) + 8 bytes (size of value)
         ICache<KeyCacheKey, RowIndexEntry> kc;
         kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity);
-        AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE, new KeyCacheSerializer());
+        AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<>(kc, CacheType.KEY_CACHE, new KeyCacheSerializer());
 
         int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave();
 
@@ -132,7 +137,7 @@ public class CacheService implements CacheServiceMBean
 
         // cache object
         ICache<RowCacheKey, IRowCacheEntry> rc = new SerializingCacheProvider().create(rowCacheInMemoryCapacity);
-        AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE, new RowCacheSerializer());
+        AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<>(rc, CacheType.ROW_CACHE, new RowCacheSerializer());
 
         int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave();
 
@@ -141,6 +146,28 @@ public class CacheService implements CacheServiceMBean
         return rowCache;
     }
 
+    private AutoSavingCache<CounterCacheKey, ClockAndCount> initCounterCache()
+    {
+        logger.info("Initializing counter cache with capacity of {} MBs", DatabaseDescriptor.getCounterCacheSizeInMB());
+
+        long capacity = DatabaseDescriptor.getCounterCacheSizeInMB() * 1024 * 1024;
+
+        AutoSavingCache<CounterCacheKey, ClockAndCount> cache =
+            new AutoSavingCache<>(ConcurrentLinkedHashCache.<CounterCacheKey, ClockAndCount>create(capacity),
+                                  CacheType.COUNTER_CACHE,
+                                  new CounterCacheSerializer());
+
+        int keysToSave = DatabaseDescriptor.getCounterCacheKeysToSave();
+
+        logger.info("Scheduling counter cache save to every {} seconds (going to save {} keys).",
+                    DatabaseDescriptor.getCounterCacheSavePeriod(),
+                    keysToSave == Integer.MAX_VALUE ? "all" : keysToSave);
+
+        cache.scheduleSaving(DatabaseDescriptor.getCounterCacheSavePeriod(), keysToSave);
+
+        return cache;
+    }
+
     public long getKeyCacheHits()
     {
         return keyCache.getMetrics().hits.count();
@@ -199,6 +226,20 @@ public class CacheService implements CacheServiceMBean
         keyCache.scheduleSaving(seconds, DatabaseDescriptor.getKeyCacheKeysToSave());
     }
 
+    public int getCounterCacheSavePeriodInSeconds()
+    {
+        return DatabaseDescriptor.getCounterCacheSavePeriod();
+    }
+
+    public void setCounterCacheSavePeriodInSeconds(int seconds)
+    {
+        if (seconds < 0)
+            throw new RuntimeException("CounterCacheSavePeriodInSeconds must be non-negative.");
+
+        DatabaseDescriptor.setCounterCacheSavePeriod(seconds);
+        counterCache.scheduleSaving(seconds, DatabaseDescriptor.getCounterCacheKeysToSave());
+    }
+
     public int getRowCacheKeysToSave()
     {
         return DatabaseDescriptor.getRowCacheKeysToSave();
@@ -225,6 +266,19 @@ public class CacheService implements CacheServiceMBean
         keyCache.scheduleSaving(getKeyCacheSavePeriodInSeconds(), count);
     }
 
+    public int getCounterCacheKeysToSave()
+    {
+        return DatabaseDescriptor.getCounterCacheKeysToSave();
+    }
+
+    public void setCounterCacheKeysToSave(int count)
+    {
+        if (count < 0)
+            throw new RuntimeException("CounterCacheKeysToSave must be non-negative.");
+        DatabaseDescriptor.setCounterCacheKeysToSave(count);
+        counterCache.scheduleSaving(getCounterCacheSavePeriodInSeconds(), count);
+    }
+
     public void invalidateKeyCache()
     {
         keyCache.clear();
@@ -235,6 +289,11 @@ public class CacheService implements CacheServiceMBean
         rowCache.clear();
     }
 
+    public void invalidateCounterCache()
+    {
+        counterCache.clear();
+    }
+
     public long getRowCacheCapacityInBytes()
     {
         return rowCache.getMetrics().capacity.value();
@@ -271,6 +330,14 @@ public class CacheService implements CacheServiceMBean
         keyCache.setCapacity(capacity * 1024 * 1024);
     }
 
+    public void setCounterCacheCapacityInMB(long capacity)
+    {
+        if (capacity < 0)
+            throw new RuntimeException("capacity should not be negative.");
+
+        counterCache.setCapacity(capacity * 1024 * 1024);
+    }
+
     public long getRowCacheSize()
     {
         return rowCache.getMetrics().size.value();
@@ -293,16 +360,60 @@ public class CacheService implements CacheServiceMBean
 
     public void saveCaches() throws ExecutionException, InterruptedException
     {
-        List<Future<?>> futures = new ArrayList<Future<?>>(2);
+        List<Future<?>> futures = new ArrayList<>(3);
         logger.debug("submitting cache saves");
 
         futures.add(keyCache.submitWrite(DatabaseDescriptor.getKeyCacheKeysToSave()));
         futures.add(rowCache.submitWrite(DatabaseDescriptor.getRowCacheKeysToSave()));
+        futures.add(counterCache.submitWrite(DatabaseDescriptor.getCounterCacheKeysToSave()));
 
         FBUtilities.waitOnFutures(futures);
         logger.debug("cache saves completed");
     }
 
+    public class CounterCacheSerializer implements CacheSerializer<CounterCacheKey, ClockAndCount>
+    {
+        public void serialize(CounterCacheKey key, DataOutput out) throws IOException
+        {
+            ByteBufferUtil.writeWithLength(key.partitionKey, out);
+            ByteBufferUtil.writeWithLength(key.cellName, out);
+        }
+
+        public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
+        {
+            final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
+            final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(ByteBufferUtil.readWithLength(in));
+            return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>()
+            {
+                public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
+                {
+                    DecoratedKey key = cfs.partitioner.decorateKey(partitionKey);
+                    Lock lock = cfs.counterLockFor(partitionKey);
+                    lock.lock();
+                    try
+                    {
+                        QueryFilter filter = QueryFilter.getNamesFilter(key,
+                                                                        cfs.metadata.cfName,
+                                                                        FBUtilities.singleton(cellName, cfs.metadata.comparator),
+                                                                        Long.MIN_VALUE);
+                        ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE);
+                        if (cf == null)
+                            return null;
+                        Cell cell = cf.getColumn(cellName);
+                        if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE))
+                            return null;
+                        ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
+                        return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount);
+                    }
+                    finally
+                    {
+                        lock.unlock();
+                    }
+                }
+            });
+        }
+    }
+
     public class RowCacheSerializer implements CacheSerializer<RowCacheKey, IRowCacheEntry>
     {
         public void serialize(RowCacheKey key, DataOutput out) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/service/CacheServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheServiceMBean.java b/src/java/org/apache/cassandra/service/CacheServiceMBean.java
index 6d93f95..28e9d3b 100644
--- a/src/java/org/apache/cassandra/service/CacheServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/CacheServiceMBean.java
@@ -27,12 +27,18 @@ public interface CacheServiceMBean
     public int getKeyCacheSavePeriodInSeconds();
     public void setKeyCacheSavePeriodInSeconds(int kcspis);
 
+    public int getCounterCacheSavePeriodInSeconds();
+    public void setCounterCacheSavePeriodInSeconds(int ccspis);
+
     public int getRowCacheKeysToSave();
     public void setRowCacheKeysToSave(int rckts);
 
     public int getKeyCacheKeysToSave();
     public void setKeyCacheKeysToSave(int kckts);
 
+    public int getCounterCacheKeysToSave();
+    public void setCounterCacheKeysToSave(int cckts);
+
     /**
      * invalidate the key cache; for use after invalidating row cache
      */
@@ -43,10 +49,14 @@ public interface CacheServiceMBean
      */
     public void invalidateRowCache();
 
+    public void invalidateCounterCache();
+
     public void setRowCacheCapacityInMB(long capacity);
 
     public void setKeyCacheCapacityInMB(long capacity);
 
+    public void setCounterCacheCapacityInMB(long capacity);
+
     /**
      * save row and key caches
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index cf8636b..6e8d231 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -129,8 +129,8 @@ public class StorageProxy implements StorageProxyMBean
 
         /*
          * We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or
-         * in CounterMutationVerbHandler on a replica othewise. The write must be executed on the MUTATION stage
-         * but on the latter case, the verb handler already run on the MUTATION stage, so we must not execute the
+         * in CounterMutationVerbHandler on a replica othewise. The write must be executed on the COUNTER_MUTATION stage
+         * but on the latter case, the verb handler already run on the COUNTER_MUTATION stage, so we must not execute the
          * underlying on the stage otherwise we risk a deadlock. Hence two different performer.
          */
         counterWritePerformer = new WritePerformer()
@@ -153,7 +153,8 @@ public class StorageProxy implements StorageProxyMBean
                               String localDataCenter,
                               ConsistencyLevel consistencyLevel)
             {
-                StageManager.getStage(Stage.MUTATION).execute(counterWriteTask(mutation, targets, responseHandler, localDataCenter));
+                StageManager.getStage(Stage.COUNTER_MUTATION)
+                            .execute(counterWriteTask(mutation, targets, responseHandler, localDataCenter));
             }
         };
     }
@@ -993,7 +994,7 @@ public class StorageProxy implements StorageProxyMBean
                 IMutation processed = SinkManager.processWriteRequest(mutation);
                 if (processed != null)
                 {
-                    processed.apply();
+                    ((Mutation) processed).apply();
                     responseHandler.response(null);
                 }
             }
@@ -1102,34 +1103,22 @@ public class StorageProxy implements StorageProxyMBean
     {
         return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION)
         {
-            public void runMayThrow()
+            public void runMayThrow() throws OverloadedException, WriteTimeoutException
             {
                 IMutation processed = SinkManager.processWriteRequest(mutation);
                 if (processed == null)
                     return;
 
                 assert processed instanceof CounterMutation;
-                final CounterMutation cm = (CounterMutation) processed;
+                CounterMutation cm = (CounterMutation) processed;
 
-                // apply mutation
-                cm.apply();
+                Mutation result = cm.apply();
                 responseHandler.response(null);
 
-                // then send to replicas, if any
-                final Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(FBUtilities.getBroadcastAddress()));
-                if (!remotes.isEmpty() && cm.shouldReplicateOnWrite())
-                {
-                    // We do the replication on another stage because it involves a read (see CM.makeReplicationMutation)
-                    // and we want to avoid blocking too much the MUTATION stage
-                    StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(MessagingService.Verb.READ)
-                    {
-                        public void runMayThrow() throws OverloadedException
-                        {
-                            // send mutation to other replica
-                            sendToHintedEndpoints(cm.makeReplicationMutation(), remotes, responseHandler, localDataCenter);
-                        }
-                    });
-                }
+                Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets),
+                                                           ImmutableSet.of(FBUtilities.getBroadcastAddress()));
+                if (!remotes.isEmpty())
+                    sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter);
             }
         };
     }
@@ -2146,6 +2135,9 @@ public class StorageProxy implements StorageProxyMBean
     public Long getWriteRpcTimeout() { return DatabaseDescriptor.getWriteRpcTimeout(); }
     public void setWriteRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setWriteRpcTimeout(timeoutInMillis); }
 
+    public Long getCounterWriteRpcTimeout() { return DatabaseDescriptor.getCounterWriteRpcTimeout(); }
+    public void setCounterWriteRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setCounterWriteRpcTimeout(timeoutInMillis); }
+
     public Long getCasContentionTimeout() { return DatabaseDescriptor.getCasContentionTimeout(); }
     public void setCasContentionTimeout(Long timeoutInMillis) { DatabaseDescriptor.setCasContentionTimeout(timeoutInMillis); }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index ad7d4c7..24dd069 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -85,6 +85,8 @@ public interface StorageProxyMBean
     public void setReadRpcTimeout(Long timeoutInMillis);
     public Long getWriteRpcTimeout();
     public void setWriteRpcTimeout(Long timeoutInMillis);
+    public Long getCounterWriteRpcTimeout();
+    public void setCounterWriteRpcTimeout(Long timeoutInMillis);
     public Long getCasContentionTimeout();
     public void setCasContentionTimeout(Long timeoutInMillis);
     public Long getRangeRpcTimeout();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index af2d682..b5b2445 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -538,14 +538,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             CounterId.renewLocalId();
         }
 
+        // Can't do this in CassandraDaemon before the SS start b/c local counter id can be renewed afterwards.
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+            if (cfs.metadata.isCounter())
+                cfs.initCounterCache();
+
         // daemon threads, like our executors', continue to run while shutdown hooks are invoked
         Thread drainOnShutdown = new Thread(new WrappedRunnable()
         {
             @Override
             public void runMayThrow() throws ExecutionException, InterruptedException, IOException
             {
+                ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
                 ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
-                if (mutationStage.isShutdown())
+                if (mutationStage.isShutdown() && counterMutationStage.isShutdown())
                     return; // drained already
 
                 shutdownClientServers();
@@ -555,7 +561,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 // In-progress writes originating here could generate hints to be written, so shut down MessagingService
                 // before mutation stage, so we can get all the hints saved before shutting down
                 MessagingService.instance().shutdown();
+                counterMutationStage.shutdown();
                 mutationStage.shutdown();
+                counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 StorageProxy.instance.verifyNoHintsInProgress();
 
@@ -2102,10 +2110,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (keyspaceName.equals(Keyspace.SYSTEM_KS))
             throw new RuntimeException("Cleanup of the system keyspace is neither necessary nor wise");
 
-        CounterId.OneShotRenewer counterIdRenewer = new CounterId.OneShotRenewer();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
         {
-            cfStore.forceCleanup(counterIdRenewer);
+            cfStore.forceCleanup();
         }
     }
 
@@ -3335,8 +3342,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     public synchronized void drain() throws IOException, InterruptedException, ExecutionException
     {
+        ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
         ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
-        if (mutationStage.isTerminated())
+        if (mutationStage.isTerminated() && counterMutationStage.isTerminated())
         {
             logger.warn("Cannot drain node (did it already happen?)");
             return;
@@ -3350,7 +3358,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         MessagingService.instance().shutdown();
 
         setMode(Mode.DRAINING, "clearing mutation stage", false);
+        counterMutationStage.shutdown();
         mutationStage.shutdown();
+        counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 
         StorageProxy.instance.verifyNoHintsInProgress();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 2c2e821..fe9dc3f 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1076,7 +1077,11 @@ public class CassandraServer implements Cassandra.Iface
         if (mutations.isEmpty())
             return;
 
-        schedule(DatabaseDescriptor.getWriteRpcTimeout());
+        long timeout = Long.MAX_VALUE;
+        for (IMutation m : mutations)
+            timeout = Longs.min(timeout, m.getTimeout());
+
+        schedule(timeout);
         try
         {
             StorageProxy.mutateWithTriggers(mutations, consistencyLevel, mutateAtomically);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index cbd43e9..830097c 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -276,6 +276,11 @@ public class NodeProbe implements AutoCloseable
         ssProxy.forceKeyspaceRepairRange(beginToken, endToken, keyspaceName, isSequential, isLocal, columnFamilies);
     }
 
+    public void invalidateCounterCache()
+    {
+        cacheService.invalidateCounterCache();
+    }
+
     public void invalidateKeyCache()
     {
         cacheService.invalidateKeyCache();
@@ -540,7 +545,7 @@ public class NodeProbe implements AutoCloseable
         ssProxy.setIncrementalBackupsEnabled(enabled);
     }
 
-    public void setCacheCapacities(int keyCacheCapacity, int rowCacheCapacity)
+    public void setCacheCapacities(int keyCacheCapacity, int rowCacheCapacity, int counterCacheCapacity)
     {
         try
         {
@@ -548,6 +553,7 @@ public class NodeProbe implements AutoCloseable
             CacheServiceMBean cacheMBean = JMX.newMBeanProxy(mbeanServerConn, new ObjectName(keyCachePath), CacheServiceMBean.class);
             cacheMBean.setKeyCacheCapacityInMB(keyCacheCapacity);
             cacheMBean.setRowCacheCapacityInMB(rowCacheCapacity);
+            cacheMBean.setCounterCacheCapacityInMB(counterCacheCapacity);
         }
         catch (MalformedObjectNameException e)
         {
@@ -555,7 +561,7 @@ public class NodeProbe implements AutoCloseable
         }
     }
 
-    public void setCacheKeysToSave(int keyCacheKeysToSave, int rowCacheKeysToSave)
+    public void setCacheKeysToSave(int keyCacheKeysToSave, int rowCacheKeysToSave, int counterCacheKeysToSave)
     {
         try
         {
@@ -563,6 +569,7 @@ public class NodeProbe implements AutoCloseable
             CacheServiceMBean cacheMBean = JMX.newMBeanProxy(mbeanServerConn, new ObjectName(keyCachePath), CacheServiceMBean.class);
             cacheMBean.setKeyCacheKeysToSave(keyCacheKeysToSave);
             cacheMBean.setRowCacheKeysToSave(rowCacheKeysToSave);
+            cacheMBean.setCounterCacheKeysToSave(counterCacheKeysToSave);
         }
         catch (MalformedObjectNameException e)
         {
@@ -910,8 +917,8 @@ public class NodeProbe implements AutoCloseable
 
     // JMX getters for the o.a.c.metrics API below.
     /**
-     * Retrieve cache metrics based on the cache type (KeyCache or RowCache)
-     * @param cacheType KeyCache or RowCache
+     * Retrieve cache metrics based on the cache type (KeyCache, RowCache, or CounterCache)
+     * @param cacheType KeyCach, RowCache, or CounterCache
      * @param metricName Capacity, Entries, HitRate, Size, Requests or Hits.
      */
     public Object getCacheMetric(String cacheType, String metricName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 9d8978e..8baabaa 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -102,6 +102,7 @@ public class NodeTool
                 GossipInfo.class,
                 InvalidateKeyCache.class,
                 InvalidateRowCache.class,
+                InvalidateCounterCache.class,
                 Join.class,
                 Move.class,
                 PauseHandoff.class,
@@ -342,6 +343,17 @@ public class NodeTool
                     probe.getCacheMetric("RowCache", "HitRate"),
                     cacheService.getRowCacheSavePeriodInSeconds());
 
+            // Counter Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
+            System.out.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+                    "Counter Cache",
+                    probe.getCacheMetric("CounterCache", "Entries"),
+                    probe.getCacheMetric("CounterCache", "Size"),
+                    probe.getCacheMetric("CounterCache", "Capacity"),
+                    probe.getCacheMetric("CounterCache", "Hits"),
+                    probe.getCacheMetric("CounterCache", "Requests"),
+                    probe.getCacheMetric("CounterCache", "HitRate"),
+                    cacheService.getCounterCacheSavePeriodInSeconds());
+
             // Tokens
             List<String> tokens = probe.getTokens();
             if (tokens.size() == 1 || this.tokens)
@@ -1331,6 +1343,16 @@ public class NodeTool
         }
     }
 
+    @Command(name = "invalidatecountercache", description = "Invalidate the counter cache")
+    public static class InvalidateCounterCache extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.invalidateCounterCache();
+        }
+    }
+
     @Command(name = "join", description = "Join the ring")
     public static class Join extends NodeToolCmd
     {
@@ -1532,17 +1554,20 @@ public class NodeTool
         }
     }
 
-    @Command(name = "setcachecapacity", description = "Set global key and row cache capacities (in MB units)")
+    @Command(name = "setcachecapacity", description = "Set global key, row, and counter cache capacities (in MB units)")
     public static class SetCacheCapacity extends NodeToolCmd
     {
-        @Arguments(title = "<key-cache-capacity> <row-cache-capacity>", usage = "<key-cache-capacity> <row-cache-capacity>", description = "Key cache and row cache (in MB)", required = true)
+        @Arguments(title = "<key-cache-capacity> <row-cache-capacity> <counter-cache-capacity>",
+                   usage = "<key-cache-capacity> <row-cache-capacity> <counter-cache-capacity>",
+                   description = "Key cache, row cache, and counter cache (in MB)",
+                   required = true)
         private List<Integer> args = new ArrayList<>();
 
         @Override
         public void execute(NodeProbe probe)
         {
-            checkArgument(args.size() == 2, "setcachecapacity requires key-cache-capacity, and row-cache-capacity args.");
-            probe.setCacheCapacities(args.get(0), args.get(1));
+            checkArgument(args.size() == 3, "setcachecapacity requires key-cache-capacity, row-cache-capacity, and counter-cache-capacity args.");
+            probe.setCacheCapacities(args.get(0), args.get(1), args.get(2));
         }
     }
 
@@ -2083,14 +2108,17 @@ public class NodeTool
     @Command(name = "setcachekeystosave", description = "Set number of keys saved by each cache for faster post-restart warmup. 0 to disable")
     public static class SetCacheKeysToSave extends NodeToolCmd
     {
-        @Arguments(title = "<key-cache-keys-to-save> <row-cache-keys-to-save>", usage = "<key-cache-keys-to-save> <row-cache-keys-to-save>", description = "The number of keys saved by each cache. 0 to disable", required = true)
+        @Arguments(title = "<key-cache-keys-to-save> <row-cache-keys-to-save> <counter-cache-keys-to-save>",
+                   usage = "<key-cache-keys-to-save> <row-cache-keys-to-save> <counter-cache-keys-to-save>",
+                   description = "The number of keys saved by each cache. 0 to disable",
+                   required = true)
         private List<Integer> args = new ArrayList<>();
 
         @Override
         public void execute(NodeProbe probe)
         {
-            checkArgument(args.size() == 2, "setcachekeystosave requires key-cache-keys-to-save, and row-cache-keys-to-save args.");
-            probe.setCacheKeysToSave(args.get(0), args.get(1));
+            checkArgument(args.size() == 3, "setcachekeystosave requires key-cache-keys-to-save, row-cache-keys-to-save, and counter-cache-keys-to-save args.");
+            probe.setCacheKeysToSave(args.get(0), args.get(1), args.get(2));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/utils/CounterId.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CounterId.java b/src/java/org/apache/cassandra/utils/CounterId.java
index a8545c0..b978879 100644
--- a/src/java/org/apache/cassandra/utils/CounterId.java
+++ b/src/java/org/apache/cassandra/utils/CounterId.java
@@ -18,15 +18,11 @@
 package org.apache.cassandra.utils;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.base.Objects;
 
-import org.apache.cassandra.db.CounterCell;
 import org.apache.cassandra.db.SystemKeyspace;
 
 public class CounterId implements Comparable<CounterId>
@@ -37,21 +33,21 @@ public class CounterId implements Comparable<CounterId>
 
     // Lazy holder because this opens the system keyspace and we want to avoid
     // having this triggered during class initialization
-    private static class LocalIds
+    private static class LocalId
     {
-        static final LocalCounterIdHistory instance = new LocalCounterIdHistory();
+        static final LocalCounterIdHolder instance = new LocalCounterIdHolder();
     }
 
     private final ByteBuffer id;
 
-    private static LocalCounterIdHistory localIds()
+    private static LocalCounterIdHolder localId()
     {
-        return LocalIds.instance;
+        return LocalId.instance;
     }
 
     public static CounterId getLocalId()
     {
-        return localIds().current.get();
+        return localId().get();
     }
 
     /**
@@ -59,24 +55,9 @@ public class CounterId implements Comparable<CounterId>
      * To use only when this strictly necessary, as using this will make all
      * counter context grow with time.
      */
-    public static void renewLocalId()
+    public static synchronized void renewLocalId()
     {
-        renewLocalId(FBUtilities.timestampMicros());
-    }
-
-    public static synchronized void renewLocalId(long now)
-    {
-        localIds().renewCurrent(now);
-    }
-
-    /**
-     * Return the list of old local counter id of this node.
-     * It is guaranteed that the returned list is sorted by growing counter id
-     * (and hence the first item will be the oldest counter id for this host)
-     */
-    public static List<CounterIdRecord> getOldLocalCounterIds()
-    {
-        return localIds().olds;
+        localId().renew(FBUtilities.timestampMicros());
     }
 
     /**
@@ -163,94 +144,39 @@ public class CounterId implements Comparable<CounterId>
         return id.hashCode();
     }
 
-    public static class OneShotRenewer
-    {
-        private boolean renewed;
-        private final CounterId initialId;
-
-        public OneShotRenewer()
-        {
-            renewed = false;
-            initialId = getLocalId();
-        }
-
-        public void maybeRenew(CounterCell column)
-        {
-            if (!renewed && column.hasCounterId(initialId))
-            {
-                renewLocalId();
-                renewed = true;
-            }
-        }
-    }
-
-    private static class LocalCounterIdHistory
+    private static class LocalCounterIdHolder
     {
         private final AtomicReference<CounterId> current;
-        private final List<CounterIdRecord> olds;
 
-        LocalCounterIdHistory()
+        LocalCounterIdHolder()
         {
             CounterId id = SystemKeyspace.getCurrentLocalCounterId();
+
             if (id == null)
             {
                 // no recorded local counter id, generating a new one and saving it
                 id = generate();
                 logger.info("No saved local counter id, using newly generated: {}", id);
                 SystemKeyspace.writeCurrentLocalCounterId(id, FBUtilities.timestampMicros());
-                current = new AtomicReference<>(id);
-                olds = new CopyOnWriteArrayList<>();
             }
             else
             {
-                logger.info("Saved local counter id: {}", id);
-                current = new AtomicReference<>(id);
-                olds = new CopyOnWriteArrayList<>(SystemKeyspace.getOldLocalCounterIds());
+                logger.info("Using saved local counter id: {}", id);
             }
+
+            current = new AtomicReference<>(id);
         }
 
-        synchronized void renewCurrent(long now)
+        synchronized void renew(long now)
         {
             CounterId newCounterId = generate();
-            CounterId old = current.get();
             SystemKeyspace.writeCurrentLocalCounterId(newCounterId, now);
             current.set(newCounterId);
-            olds.add(new CounterIdRecord(old, now));
-        }
-    }
-
-    public static class CounterIdRecord
-    {
-        public final CounterId id;
-        public final long timestamp;
-
-        public CounterIdRecord(CounterId id, long timestamp)
-        {
-            this.id = id;
-            this.timestamp = timestamp;
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            CounterIdRecord otherRecord = (CounterIdRecord)o;
-            return id.equals(otherRecord.id) && timestamp == otherRecord.timestamp;
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return Objects.hashCode(id, timestamp);
         }
 
-        public String toString()
+        CounterId get()
         {
-            return String.format("(%s, %d)", id.toString(), timestamp);
+            return current.get();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 6866402..774efeb 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -102,6 +102,7 @@ public class SchemaLoader
         String ks6 = "Keyspace6";
         String ks_kcs = "KeyCacheSpace";
         String ks_rcs = "RowCacheSpace";
+        String ks_ccs = "CounterCacheSpace";
         String ks_nocommit = "NoCommitlogSpace";
         String ks_prsi = "PerRowSecondaryIndex";
         String ks_cql = "cql_keyspace";
@@ -234,6 +235,13 @@ public class SchemaLoader
                                            standardCFMD(ks_rcs, "CFWithoutCache").caching(CFMetaData.Caching.NONE),
                                            standardCFMD(ks_rcs, "CachedCF").caching(CFMetaData.Caching.ALL)));
 
+        // CounterCacheSpace
+        schema.add(KSMetaData.testMetadata(ks_ccs,
+                                           simple,
+                                           opts_rf1,
+                                           standardCFMD(ks_ccs, "Counter1").defaultValidator(CounterColumnType.instance),
+                                           standardCFMD(ks_ccs, "Counter2").defaultValidator(CounterColumnType.instance)));
+
         schema.add(KSMetaData.testMetadataNotDurable(ks_nocommit,
                                                      simple,
                                                      opts_rf1,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index ec8f9d5..65a01a1 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -188,13 +188,13 @@ public class Util
      * @param mutations A group of Mutations for the same keyspace and column family.
      * @return The ColumnFamilyStore that was used.
      */
-    public static ColumnFamilyStore writeColumnFamily(List<IMutation> mutations) throws IOException, ExecutionException, InterruptedException
+    public static ColumnFamilyStore writeColumnFamily(List<Mutation> mutations) throws IOException, ExecutionException, InterruptedException
     {
         IMutation first = mutations.get(0);
         String keyspaceName = first.getKeyspaceName();
         UUID cfid = first.getColumnFamilyIds().iterator().next();
 
-        for (IMutation rm : mutations)
+        for (Mutation rm : mutations)
             rm.apply();
 
         ColumnFamilyStore store = Keyspace.open(keyspaceName).getColumnFamilyStore(cfid);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/config/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java
index 28d3589..d1fc54e 100644
--- a/test/unit/org/apache/cassandra/config/DefsTest.java
+++ b/test/unit/org/apache/cassandra/config/DefsTest.java
@@ -64,7 +64,6 @@ public class DefsTest extends SchemaLoader
 
         cfm.comment("No comment")
            .readRepairChance(0.5)
-           .replicateOnWrite(false)
            .gcGraceSeconds(100000)
            .minCompactionThreshold(500)
            .maxCompactionThreshold(500);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index a068f1b..9d420bc 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -42,7 +42,6 @@ import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CounterId;
 import org.junit.Test;
 
 public class CleanupTest extends SchemaLoader
@@ -79,7 +78,7 @@ public class CleanupTest extends SchemaLoader
         assertEquals(LOOPS, rows.size());
 
         // with one token in the ring, owned by the local node, cleanup should be a no-op
-        CompactionManager.instance.performCleanup(cfs, new CounterId.OneShotRenewer());
+        CompactionManager.instance.performCleanup(cfs);
 
         // ensure max timestamp of the sstables are retained post-cleanup
         assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
@@ -125,7 +124,7 @@ public class CleanupTest extends SchemaLoader
         tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
         tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
 
-        CompactionManager.instance.performCleanup(cfs, new CounterId.OneShotRenewer());
+        CompactionManager.instance.performCleanup(cfs);
 
         // row data should be gone
         rows = Util.getRangeSlice(cfs);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 5ca468f..92ca14e 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -106,9 +106,8 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
         cfs.truncateBlocking();
 
-        List<IMutation> rms = new LinkedList<IMutation>();
-        Mutation rm;
-        rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("key1"));
+        List<Mutation> rms = new LinkedList<>();
+        Mutation rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("key1"));
         rm.add("Standard1", cellname("Column1"), ByteBufferUtil.bytes("asdf"), 0);
         rm.add("Standard1", cellname("Column2"), ByteBufferUtil.bytes("asdf"), 0);
         rms.add(rm);
@@ -850,7 +849,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
     private ColumnFamilyStore insertKey1Key2() throws IOException, ExecutionException, InterruptedException
     {
         ColumnFamilyStore cfs = Keyspace.open("Keyspace2").getColumnFamilyStore("Standard1");
-        List<IMutation> rms = new LinkedList<IMutation>();
+        List<Mutation> rms = new LinkedList<>();
         Mutation rm;
         rm = new Mutation("Keyspace2", ByteBufferUtil.bytes("key1"));
         rm.add("Standard1", cellname("Column1"), ByteBufferUtil.bytes("asdf"), 0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
new file mode 100644
index 0000000..78e7c80
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.concurrent.ExecutionException;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class CounterCacheTest extends SchemaLoader
+{
+    private static final String KS = "CounterCacheSpace";
+    private static final String CF = "Counter1";
+
+    @AfterClass
+    public static void cleanup()
+    {
+        cleanupSavedCaches();
+    }
+
+    @Test
+    public void testReadWrite()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        CacheService.instance.invalidateCounterCache();
+
+        assertEquals(0, CacheService.instance.counterCache.size());
+        assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
+        assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
+        assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
+        assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
+
+        cfs.putCachedCounter(bytes(1), cellname(1), ClockAndCount.create(1L, 1L));
+        cfs.putCachedCounter(bytes(1), cellname(2), ClockAndCount.create(1L, 2L));
+        cfs.putCachedCounter(bytes(2), cellname(1), ClockAndCount.create(2L, 1L));
+        cfs.putCachedCounter(bytes(2), cellname(2), ClockAndCount.create(2L, 2L));
+
+        assertEquals(4, CacheService.instance.counterCache.size());
+        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
+        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
+        assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
+        assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
+    }
+
+    @Test
+    public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        CacheService.instance.invalidateCounterCache();
+
+        ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addColumn(new CounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
+        cells.addColumn(new CounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply();
+
+        // flush the counter cache and invalidate
+        CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+        CacheService.instance.invalidateCounterCache();
+        assertEquals(0, CacheService.instance.counterCache.size());
+
+        // load from cache and validate
+        CacheService.instance.counterCache.loadSaved(cfs);
+        assertEquals(4, CacheService.instance.counterCache.size());
+        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
+        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
+        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
+        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/db/CounterCellTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCellTest.java b/test/unit/org/apache/cassandra/db/CounterCellTest.java
index aaa4931..24d8f1c 100644
--- a/test/unit/org/apache/cassandra/db/CounterCellTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCellTest.java
@@ -65,15 +65,17 @@ public class CounterCellTest extends SchemaLoader
     public void testCreate() throws UnknownHostException
     {
         long delta = 3L;
-        CounterUpdateCell cuc = new CounterUpdateCell(cellname("x"), delta, 1L);
-        CounterCell cell = cuc.localCopy(Keyspace.open("Keyspace5").getColumnFamilyStore("Counter1"));
-
-        assert delta == cell.total();
-        assert 1 == cell.value().getShort(0);
-        assert 0 == cell.value().getShort(2);
-        assert CounterId.wrap(cell.value(), 4).isLocalId();
-        assert 1L == cell.value().getLong(4 + idLength);
-        assert delta == cell.value().getLong(4 + idLength + clockLength);
+        CounterCell cell = new CounterCell(Util.cellname("x"),
+                                           CounterContext.instance().createLocal(delta, HeapAllocator.instance),
+                                           1L,
+                                           Long.MIN_VALUE);
+
+        Assert.assertEquals(delta, cell.total());
+        Assert.assertEquals(1, cell.value().getShort(0));
+        Assert.assertEquals(0, cell.value().getShort(2));
+        Assert.assertTrue(CounterId.wrap(cell.value(), 4).isLocalId());
+        Assert.assertEquals(1L, cell.value().getLong(4 + idLength));
+        Assert.assertEquals(delta, cell.value().getLong(4 + idLength + clockLength));
     }
 
     @Test
@@ -96,25 +98,25 @@ public class CounterCellTest extends SchemaLoader
 
         // tombstone > live
         left  = new DeletedCell(cellname("x"), 1, 2L);
-        right = new CounterCell(cellname("x"), 0L, 1L);
+        right = CounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
 
         assert left.reconcile(right) == left;
 
         // tombstone < live last delete
         left  = new DeletedCell(cellname("x"), 1, 1L);
-        right = new CounterCell(cellname("x"), 0L, 4L, 2L);
+        right = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
 
         assert left.reconcile(right) == right;
 
         // tombstone == live last delete
         left  = new DeletedCell(cellname("x"), 1, 2L);
-        right = new CounterCell(cellname("x"), 0L, 4L, 2L);
+        right = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
 
         assert left.reconcile(right) == right;
 
         // tombstone > live last delete
         left  = new DeletedCell(cellname("x"), 1, 4L);
-        right = new CounterCell(cellname("x"), 0L, 9L, 1L);
+        right = CounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
 
         reconciled = left.reconcile(right);
         assert reconciled.name() == right.name();
@@ -123,25 +125,25 @@ public class CounterCellTest extends SchemaLoader
         assert ((CounterCell)reconciled).timestampOfLastDelete() == left.getMarkedForDeleteAt();
 
         // live < tombstone
-        left  = new CounterCell(cellname("x"), 0L, 1L);
+        left  = CounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
         right = new DeletedCell(cellname("x"), 1, 2L);
 
         assert left.reconcile(right) == right;
 
         // live last delete > tombstone
-        left  = new CounterCell(cellname("x"), 0L, 4L, 2L);
+        left  = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
         right = new DeletedCell(cellname("x"), 1, 1L);
 
         assert left.reconcile(right) == left;
 
         // live last delete == tombstone
-        left  = new CounterCell(cellname("x"), 0L, 4L, 2L);
+        left  = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
         right = new DeletedCell(cellname("x"), 1, 2L);
 
         assert left.reconcile(right) == left;
 
         // live last delete < tombstone
-        left  = new CounterCell(cellname("x"), 0L, 9L, 1L);
+        left  = CounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
         right = new DeletedCell(cellname("x"), 1, 4L);
 
         reconciled = left.reconcile(right);
@@ -213,15 +215,15 @@ public class CounterCellTest extends SchemaLoader
         CounterCell rightCell;
 
         // timestamp
-        leftCell = new CounterCell(cellname("x"), 0, 1L);
-        rightCell = new CounterCell(cellname("x"), 0, 2L);
+        leftCell = CounterCell.createLocal(cellname("x"), 0, 1L, Long.MIN_VALUE);
+        rightCell = CounterCell.createLocal(cellname("x"), 0, 2L, Long.MIN_VALUE);
 
         assert rightCell == leftCell.diff(rightCell);
         assert null      == rightCell.diff(leftCell);
 
         // timestampOfLastDelete
-        leftCell = new CounterCell(cellname("x"), 0, 1L, 1L);
-        rightCell = new CounterCell(cellname("x"), 0, 1L, 2L);
+        leftCell = CounterCell.createLocal(cellname("x"), 0, 1L, 1L);
+        rightCell = CounterCell.createLocal(cellname("x"), 0, 1L, 2L);
 
         assert rightCell == leftCell.diff(rightCell);
         assert null      == rightCell.diff(leftCell);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/db/CounterMutationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterMutationTest.java b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
index 389b7b7..3676ef9 100644
--- a/test/unit/org/apache/cassandra/db/CounterMutationTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,29 +17,179 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public class CounterMutationTest extends SchemaLoader
 {
+    private static final String KS = "CounterCacheSpace";
+    private static final String CF1 = "Counter1";
+    private static final String CF2 = "Counter2";
+
     @Test
-    public void testGetOldShardFromSystemKeyspace() throws IOException
+    public void testSingleCell() throws WriteTimeoutException
     {
-        // Renewing a bunch of times and checking we get the same thing from
-        // the system keyspace that what is in memory
-        CounterId.renewLocalId();
-        CounterId.renewLocalId();
-        CounterId.renewLocalId();
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF1);
+        cfs.truncateBlocking();
+
+        // Do the initial update (+1)
+        ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addCounter(cellname(1), 1L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
 
-        List<CounterId.CounterIdRecord> inMem = CounterId.getOldLocalCounterIds();
-        List<CounterId.CounterIdRecord> onDisk = SystemKeyspace.getOldLocalCounterIds();
+        // Make another increment (+2)
+        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addCounter(cellname(1), 2L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertEquals(3L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
 
-        assert inMem.equals(onDisk);
+        // Decrement to 0 (-3)
+        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addCounter(cellname(1), -3L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertEquals(0L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
+        assertEquals(ClockAndCount.create(3L, 0L), cfs.getCachedCounter(bytes(1), cellname(1)));
     }
-}
 
+    @Test
+    public void testTwoCells() throws WriteTimeoutException
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF1);
+        cfs.truncateBlocking();
+
+        // Do the initial update (+1, -1)
+        ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addCounter(cellname(1), 1L);
+        cells.addCounter(cellname(2), -1L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
+        assertEquals(-1L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
+
+        // Make another increment (+2, -2)
+        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addCounter(cellname(1), 2L);
+        cells.addCounter(cellname(2), -2L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertEquals(3L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
+
+        // Decrement to 0 (-3, +3)
+        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addCounter(cellname(1), -3L);
+        cells.addCounter(cellname(2), 3L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertEquals(0L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
+        assertEquals(0L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
+
+        // Check the caches, separately
+        assertEquals(ClockAndCount.create(3L, 0L), cfs.getCachedCounter(bytes(1), cellname(1)));
+        assertEquals(ClockAndCount.create(3L, 0L), cfs.getCachedCounter(bytes(1), cellname(2)));
+    }
+
+    @Test
+    public void testBatch() throws WriteTimeoutException
+    {
+        ColumnFamilyStore cfs1 = Keyspace.open(KS).getColumnFamilyStore(CF1);
+        ColumnFamilyStore cfs2 = Keyspace.open(KS).getColumnFamilyStore(CF2);
+
+        cfs1.truncateBlocking();
+        cfs2.truncateBlocking();
+
+        // Do the update (+1, -1), (+2, -2)
+        ColumnFamily cells1 = UnsortedColumns.factory.create(cfs1.metadata);
+        cells1.addCounter(cellname(1), 1L);
+        cells1.addCounter(cellname(2), -1L);
+
+        ColumnFamily cells2 = UnsortedColumns.factory.create(cfs2.metadata);
+        cells2.addCounter(cellname(1), 2L);
+        cells2.addCounter(cellname(2), -2L);
+
+        Mutation mutation = new Mutation(KS, bytes(1));
+        mutation.add(cells1);
+        mutation.add(cells2);
+
+        new CounterMutation(mutation, ConsistencyLevel.ONE).apply();
+
+        // Validate all values
+        ColumnFamily current1 = cfs1.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        ColumnFamily current2 = cfs2.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF2, System.currentTimeMillis()));
+
+        assertEquals(1L, CounterContext.instance().total(current1.getColumn(cellname(1)).value()));
+        assertEquals(-1L, CounterContext.instance().total(current1.getColumn(cellname(2)).value()));
+        assertEquals(2L, CounterContext.instance().total(current2.getColumn(cellname(1)).value()));
+        assertEquals(-2L, CounterContext.instance().total(current2.getColumn(cellname(2)).value()));
+
+        // Check the caches, separately
+        assertEquals(ClockAndCount.create(1L, 1L), cfs1.getCachedCounter(bytes(1), cellname(1)));
+        assertEquals(ClockAndCount.create(1L, -1L), cfs1.getCachedCounter(bytes(1), cellname(2)));
+        assertEquals(ClockAndCount.create(1L, 2L), cfs2.getCachedCounter(bytes(1), cellname(1)));
+        assertEquals(ClockAndCount.create(1L, -2L), cfs2.getCachedCounter(bytes(1), cellname(2)));
+    }
+
+    @Test
+    public void testDeletes() throws WriteTimeoutException
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF1);
+        cfs.truncateBlocking();
+
+        // Do the initial update (+1, -1)
+        ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addCounter(cellname(1), 1L);
+        cells.addCounter(cellname(2), 1L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
+        assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
+
+        // Remove the first counter, increment the second counter
+        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addTombstone(cellname(1), (int) System.currentTimeMillis() / 1000, FBUtilities.timestampMicros());
+        cells.addCounter(cellname(2), 1L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertNull(current.getColumn(cellname(1)));
+        assertEquals(2L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
+
+        // Increment the first counter, make sure it's still shadowed by the tombstone
+        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addCounter(cellname(1), 1L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertNull(current.getColumn(cellname(1)));
+
+        // Get rid of the complete partition
+        Mutation mutation = new Mutation(KS, bytes(1));
+        mutation.delete(CF1, FBUtilities.timestampMicros());
+        new CounterMutation(mutation, ConsistencyLevel.ONE).apply();
+        current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertNull(current.getColumn(cellname(1)));
+        assertNull(current.getColumn(cellname(2)));
+
+        // Increment both counters, ensure that both stay dead
+        cells = UnsortedColumns.factory.create(cfs.metadata);
+        cells.addCounter(cellname(1), 1L);
+        cells.addCounter(cellname(2), 1L);
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
+        assertNull(current.getColumn(cellname(1)));
+        assertNull(current.getColumn(cellname(2)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index 9f73ff2..ce0d2b9 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -84,7 +84,7 @@ public class RecoveryManagerTest extends SchemaLoader
         for (int i = 0; i < 10; ++i)
         {
             cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Counter1");
-            cf.addColumn(new CounterCell(cellname("col"), 1L, 1L));
+            cf.addColumn(CounterCell.createLocal(cellname("col"), 1L, 1L, Long.MIN_VALUE));
             rm = new Mutation("Keyspace1", dk.key, cf);
             rm.apply();
         }