You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/27 23:40:03 UTC

[3/3] git commit: New counters implementation

New counters implementation

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6504


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/714c4233
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/714c4233
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/714c4233

Branch: refs/heads/trunk
Commit: 714c423360c36da2a2b365efaf9c5c4f623ed133
Parents: 1218bca
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Jan 24 02:52:43 2014 -0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Jan 27 16:37:41 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   5 +
 conf/cassandra.yaml                             |  33 ++-
 doc/cql3/CQL.textile                            |   1 -
 interface/cassandra.thrift                      |   3 +-
 pylib/cqlshlib/cql3handling.py                  |   1 -
 pylib/cqlshlib/test/test_cqlsh_output.py        |   1 -
 .../apache/cassandra/cache/AutoSavingCache.java |   3 +-
 .../apache/cassandra/cache/CounterCacheKey.java |  89 ++++++++
 .../org/apache/cassandra/concurrent/Stage.java  |   6 +-
 .../cassandra/concurrent/StageManager.java      |  14 +-
 .../org/apache/cassandra/config/CFMetaData.java |  20 --
 .../org/apache/cassandra/config/Config.java     |  14 +-
 .../cassandra/config/DatabaseDescriptor.java    |  75 ++++++-
 .../cassandra/cql/AlterTableStatement.java      |   1 -
 .../org/apache/cassandra/cql/CFPropDefs.java    |   3 +-
 .../cql/CreateColumnFamilyStatement.java        |   1 -
 .../cql3/statements/BatchStatement.java         |   7 +-
 .../cassandra/cql3/statements/CFPropDefs.java   |   5 +-
 .../cql3/statements/ModificationStatement.java  |   6 +-
 src/java/org/apache/cassandra/db/Cell.java      |  13 +-
 .../org/apache/cassandra/db/ClockAndCount.java  |  73 +++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  89 ++++++--
 .../apache/cassandra/db/ConsistencyLevel.java   |  11 +-
 .../org/apache/cassandra/db/CounterCell.java    |  37 +---
 .../apache/cassandra/db/CounterMutation.java    | 210 +++++++++++++-----
 .../apache/cassandra/db/CounterUpdateCell.java  |  19 +-
 .../org/apache/cassandra/db/DeletedCell.java    |   7 -
 .../org/apache/cassandra/db/ExpiringCell.java   |   7 -
 src/java/org/apache/cassandra/db/IMutation.java |   2 +-
 src/java/org/apache/cassandra/db/Mutation.java  |   6 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  21 --
 .../db/compaction/CompactionManager.java        |  23 +-
 .../cassandra/db/context/CounterContext.java    | 214 +++++++++----------
 .../apache/cassandra/db/context/IContext.java   |  75 -------
 .../io/sstable/AbstractSSTableSimpleWriter.java |   2 +-
 .../apache/cassandra/net/MessagingService.java  |   2 +-
 .../service/AbstractWriteResponseHandler.java   |   7 +-
 .../apache/cassandra/service/CacheService.java  | 127 ++++++++++-
 .../cassandra/service/CacheServiceMBean.java    |  10 +
 .../apache/cassandra/service/StorageProxy.java  |  38 ++--
 .../cassandra/service/StorageProxyMBean.java    |   2 +
 .../cassandra/service/StorageService.java       |  18 +-
 .../cassandra/thrift/CassandraServer.java       |   7 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |  15 +-
 .../org/apache/cassandra/tools/NodeTool.java    |  42 +++-
 .../org/apache/cassandra/utils/CounterId.java   | 106 ++-------
 .../unit/org/apache/cassandra/SchemaLoader.java |   8 +
 test/unit/org/apache/cassandra/Util.java        |   4 +-
 .../org/apache/cassandra/config/DefsTest.java   |   1 -
 .../org/apache/cassandra/db/CleanupTest.java    |   5 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   7 +-
 .../apache/cassandra/db/CounterCacheTest.java   |  96 +++++++++
 .../apache/cassandra/db/CounterCellTest.java    |  44 ++--
 .../cassandra/db/CounterMutationTest.java       | 180 ++++++++++++++--
 .../cassandra/db/RecoveryManagerTest.java       |   2 +-
 .../db/context/CounterContextTest.java          |  92 ++++++--
 .../cassandra/tools/SSTableExportTest.java      |   3 +-
 .../apache/cassandra/utils/CounterIdTest.java   |  51 +++++
 59 files changed, 1333 insertions(+), 632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 10548fa..cc406c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
  * CF id is changed to be non-deterministic. Data dir/key cache are created
    uniquely for CF id (CASSANDRA-5202)
  * Cassandra won't start by default without jna (CASSANDRA-6575)
+ * New counters implementation (CASSANDRA-6504)
 
 
 2.0.5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 3070500..4e00faf 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,11 @@ Upgrading
      cold_reads_to_omit compaction option; 0.0 omits nothing (the old
      behavior) and 1.0 omits everything.
    - Multithreaded compaction has been removed.
+   - Counters implementation has been changed, replaced by a safer one with
+     less caveats, but different performance characteristics. You might have
+     to change your data model to accomodate the new implementation.
+     (See https://issues.apache.org/jira/browse/CASSANDRA-6504 and the dev
+     blog post at http://www.datastax.com/dev/blog/<PLACEHOLDER> for details).
 
 2.0.5
 =====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index bdbb9ff..06cb33f 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -171,6 +171,32 @@ row_cache_save_period: 0
 # Disabled by default, meaning all keys are going to be saved
 # row_cache_keys_to_save: 100
 
+# Maximum size of the counter cache in memory.
+#
+# Counter cache helps to reduce counter locks' contention for hot counter cells.
+# In case of RF = 1 a counter cache hit will cause Cassandra to skip the read before
+# write entirely. With RF > 1 a counter cache hit will still help to reduce the duration
+# of the lock hold, helping with hot counter cell updates, but will not allow skipping
+# the read entirely. Only the local (clock, count) tuple of a counter cell is kept
+# in memory, not the whole counter, so it's relatively cheap.
+#
+# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup.
+#
+# Default value is empty to make it "auto" (min(2.5% of Heap (in MB), 50MB)). Set to 0 to disable counter cache.
+# NOTE: if you perform counter deletes and rely on low gcgs, you should disable the counter cache.
+counter_cache_size_in_mb:
+
+# Duration in seconds after which Cassandra should
+# save the counter cache (keys only). Caches are saved to saved_caches_directory as
+# specified in this configuration file.
+#
+# Default is 7200 or 2 hours.
+counter_cache_save_period: 7200
+
+# Number of keys from the counter cache to save
+# Disabled by default, meaning all keys are going to be saved
+# counter_cache_keys_to_save: 100
+
 # The off-heap memory allocator.  Affects storage engine metadata as
 # well as caches.  Experiments show that JEMAlloc saves some memory
 # than the native GCC allocator (i.e., JEMalloc is more
@@ -234,13 +260,16 @@ seed_provider:
 # bottleneck will be reads that need to fetch data from
 # disk. "concurrent_reads" should be set to (16 * number_of_drives) in
 # order to allow the operations to enqueue low enough in the stack
-# that the OS and drives can reorder them.
+# that the OS and drives can reorder them. Same applies to
+# "concurrent_counter_writes", since counter writes read the current
+# values before incrementing and writing them back.
 #
 # On the other hand, since writes are almost never IO bound, the ideal
 # number of "concurrent_writes" is dependent on the number of cores in
 # your system; (8 * number_of_cores) is a good rule of thumb.
 concurrent_reads: 32
 concurrent_writes: 32
+concurrent_counter_writes: 32
 
 # Total memory to use for sstable-reading buffers.  Defaults to
 # the smaller of 1/4 of heap or 512MB.
@@ -491,6 +520,8 @@ read_request_timeout_in_ms: 5000
 range_request_timeout_in_ms: 10000
 # How long the coordinator should wait for writes to complete
 write_request_timeout_in_ms: 2000
+# How long the coordinator should wait for counter writes to complete
+counter_write_request_timeout_in_ms: 5000
 # How long a coordinator should continue to retry a CAS operation
 # that contends with other proposals for the same row
 cas_contention_timeout_in_ms: 1000

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index 5fff402..6d68584 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -308,7 +308,6 @@ Table creation supports the following other @<property>@:
 |@bloom_filter_fp_chance@     | _simple_ | 0.00075     | The target probability of false positive of the sstable bloom filters. Said bloom filters will be sized to provide the provided probability (thus lowering this value impact the size of bloom filters in-memory and on-disk)|
 |@compaction@                 | _map_    | _see below_ | The compaction options to use, see below.|
 |@compression@                | _map_    | _see below_ | Compression options, see below. |
-|@replicate_on_write@         | _simple_ | true        | Whether to replicate data on write. This can only be set to false for tables with counters values. Disabling this is dangerous and can result in random lose of counters, don't disable unless you are sure to know what you are doing|
 |@caching@                    | _simple_ | keys_only   | Whether to cache keys ("key cache") and/or rows ("row cache") for this table. Valid values are: @all@, @keys_only@, @rows_only@ and @none@. |
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 780ffc7..289be1f 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -459,7 +459,6 @@ struct CfDef {
     16: optional i32 id,
     17: optional i32 min_compaction_threshold,
     18: optional i32 max_compaction_threshold,
-    24: optional bool replicate_on_write,
     26: optional string key_validation_class,
     28: optional binary key_alias,
     29: optional string compaction_strategy,
@@ -492,6 +491,8 @@ struct CfDef {
     /** @deprecated */
     23: optional double memtable_operations_in_millions,
     /** @deprecated */
+    24: optional bool replicate_on_write,
+    /** @deprecated */
     25: optional double merge_shards_chance,
     /** @deprecated */
     27: optional string row_cache_provider,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 421ab27..c4fa97d 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -68,7 +68,6 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
         ('gc_grace_seconds', None),
         ('index_interval', None),
         ('read_repair_chance', None),
-        ('replicate_on_write', None),
         ('populate_io_cache_on_flush', None),
         ('default_time_to_live', None),
         ('speculative_retry', None),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/pylib/cqlshlib/test/test_cqlsh_output.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_output.py b/pylib/cqlshlib/test/test_cqlsh_output.py
index f89127d..102a040 100644
--- a/pylib/cqlshlib/test/test_cqlsh_output.py
+++ b/pylib/cqlshlib/test/test_cqlsh_output.py
@@ -661,7 +661,6 @@ class TestCqlshOutput(BaseTestCase):
               gc_grace_seconds=864000 AND
               index_interval=128 AND
               read_repair_chance=0.100000 AND
-              replicate_on_write='true' AND
               populate_io_cache_on_flush='false' AND
               default_time_to_live=0 AND
               speculative_retry='NONE' AND

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 3ed2c2c..f94999e 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -127,7 +127,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                 for (Future<Pair<K, V>> future : futures)
                 {
                     Pair<K, V> entry = future.get();
-                    put(entry.left, entry.right);
+                    if (entry != null)
+                        put(entry.left, entry.right);
                 }
             }
             catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cache/CounterCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CounterCacheKey.java b/src/java/org/apache/cassandra/cache/CounterCacheKey.java
new file mode 100644
index 0000000..acbe323
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/CounterCacheKey.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cache;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.UUID;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.utils.*;
+
+public class CounterCacheKey implements CacheKey
+{
+    public final UUID cfId;
+    public final byte[] partitionKey;
+    public final byte[] cellName;
+
+    private CounterCacheKey(UUID cfId, ByteBuffer partitionKey, CellName cellName)
+    {
+        this.cfId = cfId;
+        this.partitionKey = ByteBufferUtil.getArray(partitionKey);
+        this.cellName = ByteBufferUtil.getArray(cellName.toByteBuffer());
+    }
+
+    public static CounterCacheKey create(UUID cfId, ByteBuffer partitionKey, CellName cellName)
+    {
+        return new CounterCacheKey(cfId, partitionKey, cellName);
+    }
+
+    public PathInfo getPathInfo()
+    {
+        Pair<String, String> cf = Schema.instance.getCF(cfId);
+        return new PathInfo(cf.left, cf.right, cfId);
+    }
+
+    public long memorySize()
+    {
+        return ObjectSizes.getFieldSize(3 * ObjectSizes.getReferenceSize())
+             + ObjectSizes.getArraySize(partitionKey)
+             + ObjectSizes.getArraySize(cellName);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("CounterCacheKey(%s, %s, %s)",
+                             cfId,
+                             ByteBufferUtil.bytesToHex(ByteBuffer.wrap(partitionKey)),
+                             ByteBufferUtil.bytesToHex(ByteBuffer.wrap(cellName)));
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Arrays.deepHashCode(new Object[]{cfId, partitionKey, cellName});
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof CounterCacheKey))
+            return false;
+
+        CounterCacheKey cck = (CounterCacheKey) o;
+
+        return cfId.equals(cck.cfId)
+            && Arrays.equals(partitionKey, cck.partitionKey)
+            && Arrays.equals(cellName, cck.cellName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/concurrent/Stage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java
index f2907e2..6192cab 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -21,6 +21,7 @@ public enum Stage
 {
     READ,
     MUTATION,
+    COUNTER_MUTATION,
     GOSSIP,
     REQUEST_RESPONSE,
     ANTI_ENTROPY,
@@ -28,8 +29,7 @@ public enum Stage
     MISC,
     TRACING,
     INTERNAL_RESPONSE,
-    READ_REPAIR,
-    REPLICATE_ON_WRITE;
+    READ_REPAIR;
 
     public String getJmxType()
     {
@@ -43,9 +43,9 @@ public enum Stage
             case INTERNAL_RESPONSE:
                 return "internal";
             case MUTATION:
+            case COUNTER_MUTATION:
             case READ:
             case REQUEST_RESPONSE:
-            case REPLICATE_ON_WRITE:
             case READ_REPAIR:
                 return "request";
             default:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 2960f22..512d64a 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -43,15 +43,13 @@ public class StageManager
 
     public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
 
-    public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * FBUtilities.getAvailableProcessors();
-
     static
     {
         stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
+        stages.put(Stage.COUNTER_MUTATION, multiThreadedConfigurableStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
         stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders()));
         stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
         stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors()));
-        stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS));
         // the rest are all single-threaded
         stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
         stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
@@ -99,16 +97,6 @@ public class StageManager
                                                      stage.getJmxType());
     }
 
-    private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
-    {
-        return new JMXConfigurableThreadPoolExecutor(numThreads,
-                                                     KEEPALIVE,
-                                                     TimeUnit.SECONDS,
-                                                     new LinkedBlockingQueue<Runnable>(maxTasksBeforeBlock),
-                                                     new NamedThreadFactory(stage.getJmxName()),
-                                                     stage.getJmxType());
-    }
-
     /**
      * Retrieve a stage from the StageManager
      * @param stage name of the stage to be retrieved.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 817d4a3..f377734 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -76,7 +76,6 @@ public final class CFMetaData
 
     public final static double DEFAULT_READ_REPAIR_CHANCE = 0.1;
     public final static double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.0;
-    public final static boolean DEFAULT_REPLICATE_ON_WRITE = true;
     public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
     public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
     public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
@@ -128,7 +127,6 @@ public final class CFMetaData
                                                                     + "comment text,"
                                                                     + "read_repair_chance double,"
                                                                     + "local_read_repair_chance double,"
-                                                                    + "replicate_on_write boolean,"
                                                                     + "gc_grace_seconds int,"
                                                                     + "default_validator text,"
                                                                     + "key_validator text,"
@@ -395,7 +393,6 @@ public final class CFMetaData
     private volatile String comment = "";
     private volatile double readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
     private volatile double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
-    private volatile boolean replicateOnWrite = DEFAULT_REPLICATE_ON_WRITE;
     private volatile int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
     private volatile AbstractType<?> defaultValidator = BytesType.instance;
     private volatile AbstractType<?> keyValidator = BytesType.instance;
@@ -437,7 +434,6 @@ public final class CFMetaData
     public CFMetaData comment(String prop) { comment = enforceCommentNotNull(prop); return this;}
     public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
     public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
-    public CFMetaData replicateOnWrite(boolean prop) {replicateOnWrite = prop; return this;}
     public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
     public CFMetaData defaultValidator(AbstractType<?> prop) {defaultValidator = prop; return this;}
     public CFMetaData keyValidator(AbstractType<?> prop) {keyValidator = prop; return this;}
@@ -624,7 +620,6 @@ public final class CFMetaData
                       .comment(oldCFMD.comment)
                       .readRepairChance(oldCFMD.readRepairChance)
                       .dcLocalReadRepairChance(oldCFMD.dcLocalReadRepairChance)
-                      .replicateOnWrite(oldCFMD.replicateOnWrite)
                       .gcGraceSeconds(oldCFMD.gcGraceSeconds)
                       .defaultValidator(oldCFMD.defaultValidator)
                       .keyValidator(oldCFMD.keyValidator)
@@ -691,11 +686,6 @@ public final class CFMetaData
         return ReadRepairDecision.NONE;
     }
 
-    public boolean getReplicateOnWrite()
-    {
-        return replicateOnWrite;
-    }
-
     public boolean populateIoCacheOnFlush()
     {
         return populateIoCacheOnFlush;
@@ -869,7 +859,6 @@ public final class CFMetaData
             .append(comment, rhs.comment)
             .append(readRepairChance, rhs.readRepairChance)
             .append(dcLocalReadRepairChance, rhs.dcLocalReadRepairChance)
-            .append(replicateOnWrite, rhs.replicateOnWrite)
             .append(gcGraceSeconds, rhs.gcGraceSeconds)
             .append(defaultValidator, rhs.defaultValidator)
             .append(keyValidator, rhs.keyValidator)
@@ -902,7 +891,6 @@ public final class CFMetaData
             .append(comment)
             .append(readRepairChance)
             .append(dcLocalReadRepairChance)
-            .append(replicateOnWrite)
             .append(gcGraceSeconds)
             .append(defaultValidator)
             .append(keyValidator)
@@ -956,8 +944,6 @@ public final class CFMetaData
     {
         if (!cf_def.isSetComment())
             cf_def.setComment("");
-        if (!cf_def.isSetReplicate_on_write())
-            cf_def.setReplicate_on_write(CFMetaData.DEFAULT_REPLICATE_ON_WRITE);
         if (!cf_def.isSetPopulate_io_cache_on_flush())
             cf_def.setPopulate_io_cache_on_flush(CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH);
         if (!cf_def.isSetMin_compaction_threshold())
@@ -1047,7 +1033,6 @@ public final class CFMetaData
 
             return newCFMD.addAllColumnDefinitions(ColumnDefinition.fromThrift(newCFMD, cf_def.column_metadata))
                           .comment(cf_def.comment)
-                          .replicateOnWrite(cf_def.replicate_on_write)
                           .defaultValidator(TypeParser.parse(cf_def.default_validation_class))
                           .compressionParameters(cp)
                           .rebuild();
@@ -1125,7 +1110,6 @@ public final class CFMetaData
         comment = enforceCommentNotNull(cfm.comment);
         readRepairChance = cfm.readRepairChance;
         dcLocalReadRepairChance = cfm.dcLocalReadRepairChance;
-        replicateOnWrite = cfm.replicateOnWrite;
         gcGraceSeconds = cfm.gcGraceSeconds;
         defaultValidator = cfm.defaultValidator;
         keyValidator = cfm.keyValidator;
@@ -1265,7 +1249,6 @@ public final class CFMetaData
         def.setComment(enforceCommentNotNull(comment));
         def.setRead_repair_chance(readRepairChance);
         def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
-        def.setReplicate_on_write(replicateOnWrite);
         def.setPopulate_io_cache_on_flush(populateIoCacheOnFlush);
         def.setGc_grace_seconds(gcGraceSeconds);
         def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
@@ -1628,7 +1611,6 @@ public final class CFMetaData
         adder.add("comment", comment);
         adder.add("read_repair_chance", readRepairChance);
         adder.add("local_read_repair_chance", dcLocalReadRepairChance);
-        adder.add("replicate_on_write", replicateOnWrite);
         adder.add("populate_io_cache_on_flush", populateIoCacheOnFlush);
         adder.add("gc_grace_seconds", gcGraceSeconds);
         adder.add("default_validator", defaultValidator.toString());
@@ -1692,7 +1674,6 @@ public final class CFMetaData
 
             cfm.readRepairChance(result.getDouble("read_repair_chance"));
             cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
-            cfm.replicateOnWrite(result.getBoolean("replicate_on_write"));
             cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
             cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
             cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
@@ -2172,7 +2153,6 @@ public final class CFMetaData
             .append("comment", comment)
             .append("readRepairChance", readRepairChance)
             .append("dclocalReadRepairChance", dcLocalReadRepairChance)
-            .append("replicateOnWrite", replicateOnWrite)
             .append("gcGraceSeconds", gcGraceSeconds)
             .append("defaultValidator", defaultValidator)
             .append("keyValidator", keyValidator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2ea8e38..5a944a2 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -58,6 +58,8 @@ public class Config
 
     public volatile Long write_request_timeout_in_ms = 2000L;
 
+    public volatile Long counter_write_request_timeout_in_ms = 5000L;
+
     public volatile Long cas_contention_timeout_in_ms = 1000L;
 
     public volatile Long truncate_request_timeout_in_ms = 60000L;
@@ -70,7 +72,10 @@ public class Config
 
     public Integer concurrent_reads = 32;
     public Integer concurrent_writes = 32;
-    public Integer concurrent_replicates = 32;
+    public Integer concurrent_counter_writes = 32;
+
+    @Deprecated
+    public Integer concurrent_replicates = null;
 
     public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor
     public Integer memtable_total_space_in_mb;
@@ -165,7 +170,12 @@ public class Config
 
     public long row_cache_size_in_mb = 0;
     public volatile int row_cache_save_period = 0;
-    public int row_cache_keys_to_save = Integer.MAX_VALUE;
+    public volatile int row_cache_keys_to_save = Integer.MAX_VALUE;
+
+    public Long counter_cache_size_in_mb = null;
+    public volatile int counter_cache_save_period = 7200;
+    public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE;
+
     public String memory_allocator = NativeAllocator.class.getSimpleName();
     public boolean populate_io_cache_on_flush = false;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4b627c8..eca8881 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -86,6 +86,7 @@ public class DatabaseDescriptor
     private static RequestSchedulerOptions requestSchedulerOptions;
 
     private static long keyCacheSizeInMB;
+    private static long counterCacheSizeInMB;
     private static IAllocator memoryAllocator;
     private static long indexSummaryCapacityInMB;
 
@@ -248,10 +249,11 @@ public class DatabaseDescriptor
             throw new ConfigurationException("concurrent_writes must be at least 2");
         }
 
-        if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2)
-        {
-            throw new ConfigurationException("concurrent_replicates must be at least 2");
-        }
+        if (conf.concurrent_counter_writes != null && conf.concurrent_counter_writes < 2)
+            throw new ConfigurationException("concurrent_counter_writes must be at least 2");
+
+        if (conf.concurrent_replicates != null)
+            logger.warn("concurrent_replicates has been deprecated and should be removed from cassandra.yaml");
 
         if (conf.file_cache_size_in_mb == null)
             conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
@@ -446,6 +448,22 @@ public class DatabaseDescriptor
                     + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
         }
 
+        try
+        {
+            // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
+            counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
+                    ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
+                    : conf.counter_cache_size_in_mb;
+
+            if (counterCacheSizeInMB < 0)
+                throw new NumberFormatException(); // to escape duplicating error message
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
+                    + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.");
+        }
+
         // if set to empty/"auto" then use 5% of Heap size
         indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
             ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
@@ -780,6 +798,16 @@ public class DatabaseDescriptor
         conf.write_request_timeout_in_ms = timeOutInMillis;
     }
 
+    public static long getCounterWriteRpcTimeout()
+    {
+        return conf.counter_write_request_timeout_in_ms;
+    }
+
+    public static void setCounterWriteRpcTimeout(Long timeOutInMillis)
+    {
+        conf.counter_write_request_timeout_in_ms = timeOutInMillis;
+    }
+
     public static long getCasContentionTimeout()
     {
         return conf.cas_contention_timeout_in_ms;
@@ -818,8 +846,9 @@ public class DatabaseDescriptor
                 return getTruncateRpcTimeout();
             case READ_REPAIR:
             case MUTATION:
-            case COUNTER_MUTATION:
                 return getWriteRpcTimeout();
+            case COUNTER_MUTATION:
+                return getCounterWriteRpcTimeout();
             default:
                 return getRpcTimeout();
         }
@@ -830,7 +859,12 @@ public class DatabaseDescriptor
      */
     public static long getMinRpcTimeout()
     {
-        return Longs.min(getRpcTimeout(), getReadRpcTimeout(), getRangeRpcTimeout(), getWriteRpcTimeout(), getTruncateRpcTimeout());
+        return Longs.min(getRpcTimeout(),
+                         getReadRpcTimeout(),
+                         getRangeRpcTimeout(),
+                         getWriteRpcTimeout(),
+                         getCounterWriteRpcTimeout(),
+                         getTruncateRpcTimeout());
     }
 
     public static double getPhiConvictThreshold()
@@ -853,9 +887,9 @@ public class DatabaseDescriptor
         return conf.concurrent_writes;
     }
 
-    public static int getConcurrentReplicators()
+    public static int getConcurrentCounterWriters()
     {
-        return conf.concurrent_replicates;
+        return conf.concurrent_counter_writes;
     }
 
     public static int getFlushWriters()
@@ -1283,6 +1317,31 @@ public class DatabaseDescriptor
         return conf.row_cache_keys_to_save;
     }
 
+    public static long getCounterCacheSizeInMB()
+    {
+        return counterCacheSizeInMB;
+    }
+
+    public static int getCounterCacheSavePeriod()
+    {
+        return conf.counter_cache_save_period;
+    }
+
+    public static void setCounterCacheSavePeriod(int counterCacheSavePeriod)
+    {
+        conf.counter_cache_save_period = counterCacheSavePeriod;
+    }
+
+    public static int getCounterCacheKeysToSave()
+    {
+        return conf.counter_cache_keys_to_save;
+    }
+
+    public static void setCounterCacheKeysToSave(int counterCacheKeysToSave)
+    {
+        conf.counter_cache_keys_to_save = counterCacheKeysToSave;
+    }
+
     public static IAllocator getoffHeapMemoryAllocator()
     {
         return memoryAllocator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 0d767b2..b5ac464 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -173,7 +173,6 @@ public class AlterTableStatement
         cfm.readRepairChance(cfProps.getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, cfm.getReadRepairChance()));
         cfm.dcLocalReadRepairChance(cfProps.getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair()));
         cfm.gcGraceSeconds(cfProps.getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, cfm.getGcGraceSeconds()));
-        cfm.replicateOnWrite(cfProps.getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfm.getReplicateOnWrite()));
         int minCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfm.getMinCompactionThreshold());
         int maxCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold());
         if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CFPropDefs.java b/src/java/org/apache/cassandra/cql/CFPropDefs.java
index a7d3147..2131d06 100644
--- a/src/java/org/apache/cassandra/cql/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql/CFPropDefs.java
@@ -47,7 +47,6 @@ public class CFPropDefs {
     public static final String KW_DEFAULTVALIDATION = "default_validation";
     public static final String KW_MINCOMPACTIONTHRESHOLD = "min_compaction_threshold";
     public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_compaction_threshold";
-    public static final String KW_REPLICATEONWRITE = "replicate_on_write";
     public static final String KW_COMPACTION_STRATEGY_CLASS = "compaction_strategy_class";
     public static final String KW_CACHING = "caching";
     public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
@@ -90,7 +89,6 @@ public class CFPropDefs {
         keywords.add(KW_DEFAULTVALIDATION);
         keywords.add(KW_MINCOMPACTIONTHRESHOLD);
         keywords.add(KW_MAXCOMPACTIONTHRESHOLD);
-        keywords.add(KW_REPLICATEONWRITE);
         keywords.add(KW_COMPACTION_STRATEGY_CLASS);
         keywords.add(KW_CACHING);
         keywords.add(KW_DEFAULT_TIME_TO_LIVE);
@@ -107,6 +105,7 @@ public class CFPropDefs {
         obsoleteKeywords.add("memtable_operations_in_millions");
         obsoleteKeywords.add("memtable_flush_after_mins");
         obsoleteKeywords.add("row_cache_provider");
+        obsoleteKeywords.add("replicate_on_write");
 
         allowedKeywords.addAll(keywords);
         allowedKeywords.addAll(obsoleteKeywords);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index a71707c..7b5cbaf 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -188,7 +188,6 @@ public class CreateColumnFamilyStatement
                    .comment(cfProps.getProperty(CFPropDefs.KW_COMMENT))
                    .readRepairChance(getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE))
                    .dcLocalReadRepairChance(getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE))
-                   .replicateOnWrite(getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE))
                    .gcGraceSeconds(getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
                    .defaultValidator(cfProps.getValidator())
                    .minCompactionThreshold(minCompactionThreshold)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 25f61fb..e500359 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -25,6 +25,7 @@ import org.github.jamm.MemoryMeter;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -174,7 +175,11 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
     {
         for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
-            mutation.apply();
+        {
+            // We don't use counters internally.
+            assert mutation instanceof Mutation;
+            ((Mutation) mutation).apply();
+        }
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
index 6ce6406..3929f3c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
@@ -34,7 +34,6 @@ public class CFPropDefs extends PropertyDefinitions
     public static final String KW_GCGRACESECONDS = "gc_grace_seconds";
     public static final String KW_MINCOMPACTIONTHRESHOLD = "min_threshold";
     public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_threshold";
-    public static final String KW_REPLICATEONWRITE = "replicate_on_write";
     public static final String KW_CACHING = "caching";
     public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
     public static final String KW_INDEX_INTERVAL = "index_interval";
@@ -57,7 +56,6 @@ public class CFPropDefs extends PropertyDefinitions
         keywords.add(KW_READREPAIRCHANCE);
         keywords.add(KW_DCLOCALREADREPAIRCHANCE);
         keywords.add(KW_GCGRACESECONDS);
-        keywords.add(KW_REPLICATEONWRITE);
         keywords.add(KW_CACHING);
         keywords.add(KW_DEFAULT_TIME_TO_LIVE);
         keywords.add(KW_INDEX_INTERVAL);
@@ -67,6 +65,8 @@ public class CFPropDefs extends PropertyDefinitions
         keywords.add(KW_COMPACTION);
         keywords.add(KW_COMPRESSION);
         keywords.add(KW_MEMTABLE_FLUSH_PERIOD);
+
+        obsoleteKeywords.add("replicate_on_write");
     }
 
     private Class<? extends AbstractCompactionStrategy> compactionStrategyClass = null;
@@ -146,7 +146,6 @@ public class CFPropDefs extends PropertyDefinitions
         cfm.readRepairChance(getDouble(KW_READREPAIRCHANCE, cfm.getReadRepairChance()));
         cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair()));
         cfm.gcGraceSeconds(getInt(KW_GCGRACESECONDS, cfm.getGcGraceSeconds()));
-        cfm.replicateOnWrite(getBoolean(KW_REPLICATEONWRITE, cfm.getReplicateOnWrite()));
         int minCompactionThreshold = toInt(KW_MINCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MINCOMPACTIONTHRESHOLD), cfm.getMinCompactionThreshold());
         int maxCompactionThreshold = toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold());
         if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index c2a0080..ecbf4e1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -479,7 +479,11 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             throw new UnsupportedOperationException();
 
         for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false))
-            mutation.apply();
+        {
+            // We don't use counters internally.
+            assert mutation instanceof Mutation;
+            ((Mutation) mutation).apply();
+        }
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Cell.java b/src/java/org/apache/cassandra/db/Cell.java
index 3e04f9b..d3cf085 100644
--- a/src/java/org/apache/cassandra/db/Cell.java
+++ b/src/java/org/apache/cassandra/db/Cell.java
@@ -234,17 +234,13 @@ public class Cell implements OnDiskAtom
     {
         if (this == o)
             return true;
+
         if (o == null || getClass() != o.getClass())
             return false;
 
         Cell cell = (Cell)o;
 
-        if (timestamp != cell.timestamp)
-            return false;
-        if (!name.equals(cell.name))
-            return false;
-
-        return value.equals(cell.value);
+        return timestamp == cell.timestamp && name.equals(cell.name) && value.equals(cell.value);
     }
 
     @Override
@@ -256,11 +252,6 @@ public class Cell implements OnDiskAtom
         return result;
     }
 
-    public Cell localCopy(ColumnFamilyStore cfs)
-    {
-        return localCopy(cfs, HeapAllocator.instance);
-    }
-
     public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new Cell(name.copy(allocator), allocator.clone(value), timestamp);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ClockAndCount.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClockAndCount.java b/src/java/org/apache/cassandra/db/ClockAndCount.java
new file mode 100644
index 0000000..1678c8c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ClockAndCount.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.cache.IMeasurableMemory;
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class ClockAndCount implements IMeasurableMemory
+{
+    public static ClockAndCount BLANK = ClockAndCount.create(0L, 0L);
+
+    public final long clock;
+    public final long count;
+
+    private ClockAndCount(long clock, long count)
+    {
+        this.clock = clock;
+        this.count = count;
+    }
+
+    public static ClockAndCount create(long clock, long count)
+    {
+        return new ClockAndCount(clock, count);
+    }
+
+    public long memorySize()
+    {
+        return ObjectSizes.getFieldSize(TypeSizes.NATIVE.sizeof(clock))
+             + ObjectSizes.getFieldSize(TypeSizes.NATIVE.sizeof(count));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof ClockAndCount))
+            return false;
+
+        ClockAndCount other = (ClockAndCount) o;
+        return clock == other.clock && count == other.count;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(clock, count);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("ClockAndCount(%s,%s)", clock, count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index fe77d09..2237214 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 import java.util.regex.Pattern;
 import javax.management.*;
 
@@ -32,13 +33,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Striped;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.IRowCacheEntry;
-import org.apache.cassandra.cache.RowCacheKey;
-import org.apache.cassandra.cache.RowCacheSentinel;
+import org.apache.cassandra.cache.*;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
@@ -46,6 +46,7 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.composites.Composite;
@@ -105,6 +106,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public final ColumnFamilyMetrics metric;
     public volatile long sampleLatencyNanos;
 
+    private final Striped<Lock> counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 128);
+
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
@@ -332,9 +335,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         data.unreferenceSSTables();
         indexManager.invalidate();
 
-        for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
-            if (key.cfId == metadata.cfId)
-                invalidateCachedRow(key);
+        invalidateCaches();
+    }
+
+    /**
+     * Obtain a lock for this CF's part of a counter mutation
+     * @param key the key for the CounterMutation
+     * @return the striped lock instance
+     */
+    public Lock counterLockFor(ByteBuffer key)
+    {
+        assert metadata.isCounter();
+        return counterLocks.get(key);
     }
 
     /**
@@ -562,13 +574,29 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this);
         if (cachedRowsRead > 0)
-            logger.info("completed loading ({} ms; {} keys) row cache for {}.{}",
+            logger.info("Completed loading ({} ms; {} keys) row cache for {}.{}",
                         TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
                         cachedRowsRead,
                         keyspace.getName(),
                         name);
     }
 
+    public void initCounterCache()
+    {
+        if (!metadata.isCounter() || CacheService.instance.counterCache.getCapacity() == 0)
+            return;
+
+        long start = System.nanoTime();
+
+        int cachedShardsRead = CacheService.instance.counterCache.loadSaved(this);
+        if (cachedShardsRead > 0)
+            logger.info("Completed loading ({} ms; {} shards) counter cache for {}.{}",
+                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
+                        cachedShardsRead,
+                        keyspace.getName(),
+                        name);
+    }
+
     /**
      * See #{@code StorageService.loadNewSSTables(String, String)} for more info
      *
@@ -1073,9 +1101,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return maxFile;
     }
 
-    public void forceCleanup(CounterId.OneShotRenewer renewer) throws ExecutionException, InterruptedException
+    public void forceCleanup() throws ExecutionException, InterruptedException
     {
-        CompactionManager.instance.performCleanup(ColumnFamilyStore.this, renewer);
+        CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
     }
 
     public void scrub(boolean disableSnapshot) throws ExecutionException, InterruptedException
@@ -1535,6 +1563,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
                 invalidateCachedRow(dk);
         }
+
+        if (metadata.isCounter())
+        {
+            for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
+            {
+                DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
+                if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
+                    CacheService.instance.counterCache.remove(key);
+            }
+        }
     }
 
     public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row>
@@ -1886,6 +1924,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily) cached;
     }
 
+    private void invalidateCaches()
+    {
+        for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+            if (key.cfId == metadata.cfId)
+                invalidateCachedRow(key);
+
+        if (metadata.isCounter())
+            for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
+                if (key.cfId == metadata.cfId)
+                    CacheService.instance.counterCache.remove(key);
+    }
+
     /**
      * @return true if @param key is contained in the row cache
      */
@@ -1908,6 +1958,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         invalidateCachedRow(new RowCacheKey(cfId, key));
     }
 
+    public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName)
+    {
+        if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
+            return null;
+        return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, cellName));
+    }
+
+    public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount)
+    {
+        if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
+            return;
+        CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, cellName), clockAndCount);
+    }
+
     public void forceMajorCompaction() throws InterruptedException, ExecutionException
     {
         CompactionManager.instance.performMaximal(this);
@@ -2017,12 +2081,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                 SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
 
-                logger.debug("cleaning out row cache");
-                for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
-                {
-                    if (key.cfId == metadata.cfId)
-                        invalidateCachedRow(key);
-                }
+                invalidateCaches();
             }
         };
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 6d04314..c97e71f 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -341,17 +341,10 @@ public enum ConsistencyLevel
     public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException
     {
         if (this == ConsistencyLevel.ANY)
-        {
             throw new InvalidRequestException("Consistency level ANY is not yet supported for counter columnfamily " + metadata.cfName);
-        }
-        else if (!metadata.getReplicateOnWrite() && !(this == ConsistencyLevel.ONE || this == ConsistencyLevel.LOCAL_ONE))
-        {
-            throw new InvalidRequestException("cannot achieve CL > CL.ONE without replicate_on_write on columnfamily " + metadata.cfName);
-        }
-        else if (isSerialConsistency())
-        {
+
+        if (isSerialConsistency())
             throw new InvalidRequestException("Counter operations are inherently non-serializable");
-        }
     }
 
     private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
index 76949d4..426d876 100644
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.context.IContext.ContextRelationship;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Allocator;
@@ -40,16 +39,6 @@ public class CounterCell extends Cell
 
     private final long timestampOfLastDelete;
 
-    public CounterCell(CellName name, long value, long timestamp)
-    {
-        this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp);
-    }
-
-    public CounterCell(CellName name, long value, long timestamp, long timestampOfLastDelete)
-    {
-        this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
-    }
-
     public CounterCell(CellName name, ByteBuffer value, long timestamp)
     {
         this(name, value, timestamp, Long.MIN_VALUE);
@@ -68,6 +57,12 @@ public class CounterCell extends Cell
         return new CounterCell(name, value, timestamp, timestampOfLastDelete);
     }
 
+    // For use by tests of compatibility with pre-2.1 counter only.
+    public static CounterCell createLocal(CellName name, long value, long timestamp, long timestampOfLastDelete)
+    {
+        return new CounterCell(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
+    }
+
     @Override
     public Cell withUpdatedName(CellName newName)
     {
@@ -110,12 +105,12 @@ public class CounterCell extends Cell
         // merging a CounterCell with a tombstone never return a tombstone
         // unless that tombstone timestamp is greater that the CounterCell
         // one.
-        assert !(cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
+        assert cell instanceof CounterCell : "Wrong class type: " + cell.getClass();
 
         if (timestampOfLastDelete() < ((CounterCell) cell).timestampOfLastDelete())
             return cell;
-        ContextRelationship rel = contextManager.diff(cell.value(), value());
-        if (ContextRelationship.GREATER_THAN == rel || ContextRelationship.DISJOINT == rel)
+        CounterContext.Relationship rel = contextManager.diff(cell.value(), value());
+        if (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT)
             return cell;
         return null;
     }
@@ -195,12 +190,6 @@ public class CounterCell extends Cell
     }
 
     @Override
-    public Cell localCopy(ColumnFamilyStore cfs)
-    {
-        return localCopy(cfs, HeapAllocator.instance);
-    }
-
-    @Override
     public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new CounterCell(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
@@ -231,14 +220,6 @@ public class CounterCell extends Cell
         contextManager.validateContext(value());
     }
 
-    /**
-     * Check if a given counterId is found in this CounterCell context.
-     */
-    public boolean hasCounterId(CounterId id)
-    {
-        return contextManager.hasCounterId(value(), id);
-    }
-
     public Cell markLocalToBeCleared()
     {
         ByteBuffer marked = contextManager.markLocalToBeCleared(value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 7dcb05c..6884c80 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -21,22 +21,21 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import com.google.common.collect.Iterables;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.*;
 
 public class CounterMutation implements IMutation
 {
@@ -76,67 +75,175 @@ public class CounterMutation implements IMutation
         return consistency;
     }
 
-    public Mutation makeReplicationMutation()
+    public MessageOut<CounterMutation> makeMutationMessage()
+    {
+        return new MessageOut<>(MessagingService.Verb.COUNTER_MUTATION, this, serializer);
+    }
+
+    /**
+     * Applies the counter mutation, returns the result Mutation (for replication to other nodes).
+     *
+     * 1. Grabs the striped CF-level lock(s)
+     * 2. Gets the current values of the counters-to-be-modified from the counter cache
+     * 3. Reads the rest of the current values (cache misses) from the CF
+     * 4. Writes the updated counter values
+     * 5. Updates the counter cache
+     * 6. Releases the lock(s)
+     *
+     * See CASSANDRA-4775 and CASSANDRA-6504 for further details.
+     *
+     * @return the applied resulting Mutation
+     */
+    public Mutation apply() throws WriteTimeoutException
     {
-        List<ReadCommand> readCommands = new LinkedList<>();
-        long timestamp = System.currentTimeMillis();
-        for (ColumnFamily columnFamily : mutation.getColumnFamilies())
+        Mutation result = new Mutation(getKeyspaceName(), ByteBufferUtil.clone(key()));
+        Keyspace keyspace = Keyspace.open(getKeyspaceName());
+
+        ArrayList<UUID> cfIds = new ArrayList<>(getColumnFamilyIds());
+        Collections.sort(cfIds); // will lock in the sorted order, to avoid a potential deadlock.
+        ArrayList<Lock> locks = new ArrayList<>(cfIds.size());
+        try
         {
-            if (!columnFamily.metadata().getReplicateOnWrite())
-                continue;
-            addReadCommandFromColumnFamily(mutation.getKeyspaceName(), mutation.key(), columnFamily, timestamp, readCommands);
+            Tracing.trace("Acquiring {} counter locks", cfIds.size());
+            for (UUID cfId : cfIds)
+            {
+                Lock lock = keyspace.getColumnFamilyStore(cfId).counterLockFor(key());
+                if (!lock.tryLock(getTimeout(), TimeUnit.MILLISECONDS))
+                    throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
+                locks.add(lock);
+            }
+
+            for (ColumnFamily cf : getColumnFamilies())
+                result.add(processModifications(cf));
+
+            result.apply();
+            updateCounterCache(result, keyspace);
+            return result;
+        }
+        catch (InterruptedException e)
+        {
+            throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
+        }
+        finally
+        {
+            for (Lock lock : locks)
+                lock.unlock();
         }
+    }
+
+    // Replaces all the CounterUpdateCell-s with updated regular CounterCell-s
+    private ColumnFamily processModifications(ColumnFamily changesCF)
+    {
+        Allocator allocator = HeapAllocator.instance;
+        ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id());
 
-        // create a replication Mutation
-        Mutation replicationMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
-        for (ReadCommand readCommand : readCommands)
+        ColumnFamily resultCF = changesCF.cloneMeShallow();
+
+        List<CounterUpdateCell> counterUpdateCells = new ArrayList<>(changesCF.getColumnCount());
+        for (Cell cell : changesCF)
         {
-            Keyspace keyspace = Keyspace.open(readCommand.ksName);
-            Row row = readCommand.getRow(keyspace);
-            if (row == null || row.cf == null)
-                continue;
+            if (cell instanceof CounterUpdateCell)
+                counterUpdateCells.add((CounterUpdateCell)cell);
+            else
+                resultCF.addColumn(cell.localCopy(cfs, allocator));
+        }
 
-            ColumnFamily cf = row.cf;
-            replicationMutation.add(cf);
+        if (counterUpdateCells.isEmpty())
+            return resultCF; // only DELETEs
+
+        ClockAndCount[] currentValues = getCurrentValues(counterUpdateCells, cfs);
+        for (int i = 0; i < counterUpdateCells.size(); i++)
+        {
+            ClockAndCount currentValue = currentValues[i];
+            CounterUpdateCell update = counterUpdateCells.get(i);
+
+            long clock = currentValue.clock + 1L;
+            long count = currentValue.count + update.delta();
+
+            resultCF.addColumn(new CounterCell(update.name().copy(allocator),
+                                               CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count, allocator),
+                                               update.timestamp()));
         }
-        return replicationMutation;
+
+        return resultCF;
     }
 
-    private void addReadCommandFromColumnFamily(String keyspaceName, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
+    // Attempt to load the current values(s) from cache. If that fails, read the rest from the cfs.
+    private ClockAndCount[] getCurrentValues(List<CounterUpdateCell> counterUpdateCells, ColumnFamilyStore cfs)
     {
-        SortedSet<CellName> s = new TreeSet<>(columnFamily.metadata().comparator);
-        Iterables.addAll(s, columnFamily.getColumnNames());
-        commands.add(new SliceByNamesReadCommand(keyspaceName, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
+        ClockAndCount[] currentValues = new ClockAndCount[counterUpdateCells.size()];
+        int remaining = counterUpdateCells.size();
+
+        if (CacheService.instance.counterCache.getCapacity() != 0)
+        {
+            Tracing.trace("Fetching {} counter values from cache", counterUpdateCells.size());
+            remaining = getCurrentValuesFromCache(counterUpdateCells, cfs, currentValues);
+            if (remaining == 0)
+                return currentValues;
+        }
+
+        Tracing.trace("Reading {} counter values from the CF", remaining);
+        getCurrentValuesFromCFS(counterUpdateCells, cfs, currentValues);
+
+        return currentValues;
     }
 
-    public MessageOut<CounterMutation> makeMutationMessage()
+    // Returns the count of cache misses.
+    private int getCurrentValuesFromCache(List<CounterUpdateCell> counterUpdateCells,
+                                          ColumnFamilyStore cfs,
+                                          ClockAndCount[] currentValues)
     {
-        return new MessageOut<>(MessagingService.Verb.COUNTER_MUTATION, this, serializer);
+        int cacheMisses = 0;
+        for (int i = 0; i < counterUpdateCells.size(); i++)
+        {
+            ClockAndCount cached = cfs.getCachedCounter(key(), counterUpdateCells.get(i).name());
+            if (cached != null)
+                currentValues[i] = cached;
+            else
+                cacheMisses++;
+        }
+        return cacheMisses;
     }
 
-    public boolean shouldReplicateOnWrite()
+    // Reads the missing current values from the CFS.
+    private void getCurrentValuesFromCFS(List<CounterUpdateCell> counterUpdateCells,
+                                         ColumnFamilyStore cfs,
+                                         ClockAndCount[] currentValues)
     {
-        for (ColumnFamily cf : mutation.getColumnFamilies())
-            if (cf.metadata().getReplicateOnWrite())
-                return true;
-        return false;
+        SortedSet<CellName> names = new TreeSet<>(cfs.metadata.comparator);
+        for (int i = 0; i < currentValues.length; i++)
+            if (currentValues[i] == null)
+                names.add(counterUpdateCells.get(i).name);
+
+        ReadCommand cmd = new SliceByNamesReadCommand(getKeyspaceName(), key(), cfs.metadata.cfName, Long.MIN_VALUE, new NamesQueryFilter(names));
+        Row row = cmd.getRow(cfs.keyspace);
+        ColumnFamily cf = row == null ? null : row.cf;
+
+        for (int i = 0; i < currentValues.length; i++)
+        {
+            if (currentValues[i] != null)
+                continue;
+
+            Cell cell = cf == null ? null : cf.getColumn(counterUpdateCells.get(i).name());
+            if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE)) // absent or a tombstone.
+                currentValues[i] = ClockAndCount.BLANK;
+            else
+                currentValues[i] = CounterContext.instance().getLocalClockAndCount(cell.value());
+        }
     }
 
-    public void apply()
+    private void updateCounterCache(Mutation applied, Keyspace keyspace)
     {
-        // transform all CounterUpdateCell to CounterCell: accomplished by localCopy
-        Mutation m = new Mutation(mutation.getKeyspaceName(), ByteBufferUtil.clone(mutation.key()));
-        Keyspace keyspace = Keyspace.open(m.getKeyspaceName());
+        if (CacheService.instance.counterCache.getCapacity() == 0)
+            return;
 
-        for (ColumnFamily cf_ : mutation.getColumnFamilies())
+        for (ColumnFamily cf : applied.getColumnFamilies())
         {
-            ColumnFamily cf = cf_.cloneMeShallow();
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
-            for (Cell cell : cf_)
-                cf.addColumn(cell.localCopy(cfs), HeapAllocator.instance);
-            m.add(cf);
+            for (Cell cell : cf)
+                if (cell instanceof CounterCell)
+                    cfs.putCachedCounter(key(), cell.name(), CounterContext.instance().getLocalClockAndCount(cell.value()));
         }
-        m.apply();
     }
 
     public void addAll(IMutation m)
@@ -147,6 +254,11 @@ public class CounterMutation implements IMutation
         mutation.addAll(cm.mutation);
     }
 
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getCounterWriteRpcTimeout();
+    }
+
     @Override
     public String toString()
     {
@@ -176,7 +288,7 @@ public class CounterMutation implements IMutation
         public long serializedSize(CounterMutation cm, int version)
         {
             return Mutation.serializer.serializedSize(cm.mutation, version)
-                    + TypeSizes.NATIVE.sizeof(cm.consistency.name());
+                 + TypeSizes.NATIVE.sizeof(cm.consistency.name());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
index dd2bf2a..f7a0ef1 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
@@ -21,10 +21,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
 
 /**
  * A counter update while it hasn't been applied yet by the leader replica.
@@ -63,13 +61,12 @@ public class CounterUpdateCell extends Cell
         // The only time this could happen is if a batchAdd ships two
         // increment for the same cell. Hence we simply sums the delta.
 
-        assert (cell instanceof CounterUpdateCell) || (cell instanceof DeletedCell) : "Wrong class type.";
-
         // tombstones take precedence
         if (cell.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired cell, so the current time is irrelevant
             return timestamp() > cell.timestamp() ? this : cell;
 
         // neither is tombstoned
+        assert cell instanceof CounterUpdateCell : "Wrong class type.";
         CounterUpdateCell c = (CounterUpdateCell) cell;
         return new CounterUpdateCell(name(), delta() + c.delta(), Math.max(timestamp(), c.timestamp()));
     }
@@ -81,21 +78,9 @@ public class CounterUpdateCell extends Cell
     }
 
     @Override
-    public CounterCell localCopy(ColumnFamilyStore cfs)
-    {
-        return new CounterCell(name.copy(HeapAllocator.instance),
-                               CounterContext.instance().createLocal(delta(), HeapAllocator.instance),
-                               timestamp(),
-                               Long.MIN_VALUE);
-    }
-
-    @Override
     public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
-        return new CounterCell(name.copy(allocator),
-                               CounterContext.instance().createLocal(delta(), allocator),
-                               timestamp(),
-                               Long.MIN_VALUE);
+        throw new UnsupportedOperationException();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/DeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java
index 5b89e1d..13d1358 100644
--- a/src/java/org/apache/cassandra/db/DeletedCell.java
+++ b/src/java/org/apache/cassandra/db/DeletedCell.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
 
 public class DeletedCell extends Cell
 {
@@ -98,12 +97,6 @@ public class DeletedCell extends Cell
     }
 
     @Override
-    public Cell localCopy(ColumnFamilyStore cfs)
-    {
-        return new DeletedCell(name.copy(HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
-    }
-
-    @Override
     public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new DeletedCell(name.copy(allocator), allocator.clone(value), timestamp);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
index b15514e..a2f68da 100644
--- a/src/java/org/apache/cassandra/db/ExpiringCell.java
+++ b/src/java/org/apache/cassandra/db/ExpiringCell.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.HeapAllocator;
 
 /**
  * Alternative to Cell that have an expiring time.
@@ -133,12 +132,6 @@ public class ExpiringCell extends Cell
     }
 
     @Override
-    public Cell localCopy(ColumnFamilyStore cfs)
-    {
-        return localCopy(cfs, HeapAllocator.instance);
-    }
-
-    @Override
     public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new ExpiringCell(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 70bd79c..44df104 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -26,7 +26,7 @@ public interface IMutation
     public String getKeyspaceName();
     public Collection<UUID> getColumnFamilyIds();
     public ByteBuffer key();
-    public void apply();
+    public long getTimeout();
     public String toString(boolean shallow);
     public void addAll(IMutation m);
     public Collection<ColumnFamily> getColumnFamilies();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index d70d7f9..31d9503 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -26,6 +26,7 @@ import java.util.*;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
@@ -219,6 +220,11 @@ public class Mutation implements IMutation
         return new MessageOut<>(verb, this, serializer);
     }
 
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getWriteRpcTimeout();
+    }
+
     public String toString()
     {
         return toString(false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 39bdd15..f8637c1 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -693,27 +693,6 @@ public class SystemKeyspace
         forceBlockingFlush(COUNTER_ID_CF);
     }
 
-    public static List<CounterId.CounterIdRecord> getOldLocalCounterIds()
-    {
-        List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
-
-        Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
-        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
-        ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
-
-        CounterId previous = null;
-        for (Cell c : cf)
-        {
-            if (previous != null)
-                l.add(new CounterId.CounterIdRecord(previous, c.timestamp()));
-
-            // this will ignore the last column on purpose since it is the
-            // current local node id
-            previous = CounterId.wrap(c.name().toByteBuffer());
-        }
-        return l;
-    }
-
     /**
      * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
      * @return CFS responsible to hold low-level serialized schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2a8d68d..1414c3f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -261,7 +261,7 @@ public class CompactionManager implements CompactionManagerMBean
         });
     }
 
-    public void performCleanup(ColumnFamilyStore cfStore, final CounterId.OneShotRenewer renewer) throws InterruptedException, ExecutionException
+    public void performCleanup(ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
     {
         performAllSSTableOperation(cfStore, new AllSSTablesOperation()
         {
@@ -272,7 +272,7 @@ public class CompactionManager implements CompactionManagerMBean
                 List<SSTableReader> sortedSSTables = Lists.newArrayList(sstables);
                 Collections.sort(sortedSSTables, new SSTableReader.SizeComparator());
 
-                doCleanupCompaction(store, sortedSSTables, renewer);
+                doCleanupCompaction(store, sortedSSTables);
             }
         });
     }
@@ -508,7 +508,7 @@ public class CompactionManager implements CompactionManagerMBean
      *
      * @throws IOException
      */
-    private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables, CounterId.OneShotRenewer renewer) throws IOException
+    private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables) throws IOException
     {
         assert !cfs.isIndex();
         Keyspace keyspace = cfs.keyspace;
@@ -520,7 +520,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         boolean hasIndexes = cfs.indexManager.hasIndexes();
-        CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, renewer);
+        CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges);
 
         for (SSTableReader sstable : sstables)
         {
@@ -614,12 +614,11 @@ public class CompactionManager implements CompactionManagerMBean
 
     private static abstract class CleanupStrategy
     {
-        public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
+        public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
         {
-            if (cfs.indexManager.hasIndexes() || cfs.metadata.isCounter())
-                return new Full(cfs, ranges, renewer);
-
-            return new Bounded(cfs, ranges);
+            return cfs.indexManager.hasIndexes()
+                 ? new Full(cfs, ranges)
+                 : new Bounded(cfs, ranges);
         }
 
         public abstract ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter);
@@ -660,14 +659,12 @@ public class CompactionManager implements CompactionManagerMBean
             private final Collection<Range<Token>> ranges;
             private final ColumnFamilyStore cfs;
             private List<Cell> indexedColumnsInRow;
-            private final CounterId.OneShotRenewer renewer;
 
-            public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
+            public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
             {
                 this.cfs = cfs;
                 this.ranges = ranges;
                 this.indexedColumnsInRow = null;
-                this.renewer = renewer;
             }
 
             @Override
@@ -690,8 +687,6 @@ public class CompactionManager implements CompactionManagerMBean
                 while (row.hasNext())
                 {
                     OnDiskAtom column = row.next();
-                    if (column instanceof CounterCell)
-                        renewer.maybeRenew((CounterCell) column);
 
                     if (column instanceof Cell && cfs.indexManager.indexes((Cell) column))
                     {