You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/12/21 22:40:53 UTC

[4/4] git commit: Rename RowMutation->Mutation in preparation for Row->Partition

Rename RowMutation->Mutation in preparation for Row->Partition


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6bbb13b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6bbb13b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6bbb13b9

Branch: refs/heads/trunk
Commit: 6bbb13b9b0fe62de2a2140055af2ea6968c73ccc
Parents: d753661
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Dec 22 00:37:43 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Dec 22 00:37:43 2013 +0300

----------------------------------------------------------------------
 .../cassandra/triggers/InvertedIndex.java       |  10 +-
 .../org/apache/cassandra/config/CFMetaData.java |  52 +--
 .../cassandra/config/ColumnDefinition.java      |  14 +-
 .../org/apache/cassandra/config/KSMetaData.java |  27 +-
 .../cassandra/config/TriggerDefinition.java     |  16 +-
 .../org/apache/cassandra/config/UTMetaData.java |  16 +-
 .../apache/cassandra/cql/DeleteStatement.java   |  22 +-
 .../apache/cassandra/cql/UpdateStatement.java   |  35 +-
 .../cql3/statements/ModificationStatement.java  |  10 +-
 .../apache/cassandra/db/BatchlogManager.java    |  22 +-
 .../cassandra/db/CollationController.java       |   4 +-
 .../org/apache/cassandra/db/CounterCell.java    |   6 +-
 .../apache/cassandra/db/CounterMutation.java    |  79 ++---
 .../db/DefinitionsUpdateVerbHandler.java        |   6 +-
 .../org/apache/cassandra/db/DefsTables.java     |   8 +-
 .../cassandra/db/HintedHandOffManager.java      |  36 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |   4 +-
 .../db/MigrationRequestVerbHandler.java         |   8 +-
 src/java/org/apache/cassandra/db/Mutation.java  | 336 +++++++++++++++++++
 .../cassandra/db/MutationVerbHandler.java       |  85 +++++
 .../cassandra/db/ReadRepairVerbHandler.java     |   7 +-
 .../org/apache/cassandra/db/RowMutation.java    | 335 ------------------
 .../cassandra/db/RowMutationVerbHandler.java    |  87 -----
 .../org/apache/cassandra/db/SystemKeyspace.java |  33 +-
 .../cassandra/db/commitlog/CommitLog.java       |  13 +-
 .../db/commitlog/CommitLogReplayer.java         |  52 +--
 .../db/commitlog/CommitLogSegment.java          |  14 +-
 .../db/commitlog/CommitLogSegmentManager.java   |   9 +-
 .../apache/cassandra/net/MessagingService.java  |   8 +-
 .../cassandra/service/MigrationManager.java     |  32 +-
 .../apache/cassandra/service/MigrationTask.java |   6 +-
 .../cassandra/service/RowDataResolver.java      |   9 +-
 .../apache/cassandra/service/StorageProxy.java  |  58 ++--
 .../cassandra/service/StorageService.java       |   2 +-
 .../apache/cassandra/service/paxos/Commit.java  |  11 +-
 .../cassandra/service/paxos/PaxosState.java     |   6 +-
 .../org/apache/cassandra/sink/IRequestSink.java |   4 +-
 .../cassandra/thrift/CassandraServer.java       | 114 +++----
 .../apache/cassandra/tracing/TraceState.java    |   4 +-
 .../org/apache/cassandra/tracing/Tracing.java   |   6 +-
 .../org/apache/cassandra/triggers/ITrigger.java |   8 +-
 .../cassandra/triggers/TriggerExecutor.java     |  21 +-
 .../org/apache/cassandra/cli/CliHelp.yaml       |   4 +-
 .../apache/cassandra/db/LongKeyspaceTest.java   |   4 +-
 .../apache/cassandra/db/MeteredFlusherTest.java |   2 +-
 .../cassandra/db/commitlog/ComitLogStress.java  |   4 +-
 .../db/compaction/LongCompactionsTest.java      |   2 +-
 .../LongLeveledCompactionStrategyTest.java      |   8 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |   7 +-
 test/unit/org/apache/cassandra/Util.java        |   4 +-
 .../apache/cassandra/config/CFMetaDataTest.java |   3 +-
 .../org/apache/cassandra/config/DefsTest.java   |  18 +-
 .../org/apache/cassandra/db/CleanupTest.java    |   4 +-
 .../cassandra/db/CollationControllerTest.java   |  19 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     | 108 +++---
 .../org/apache/cassandra/db/CommitLogTest.java  |  10 +-
 .../cassandra/db/CounterMutationTest.java       |   8 +-
 .../apache/cassandra/db/HintedHandOffTest.java  |   2 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |   7 +-
 .../apache/cassandra/db/KeyCollisionTest.java   |   4 +-
 .../org/apache/cassandra/db/KeyspaceTest.java   |  30 +-
 .../org/apache/cassandra/db/MultitableTest.java |   6 +-
 .../org/apache/cassandra/db/NameSortTest.java   |   6 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  40 +--
 .../apache/cassandra/db/ReadMessageTest.java    |   8 +-
 .../cassandra/db/RecoveryManager2Test.java      |   2 +-
 .../cassandra/db/RecoveryManager3Test.java      |   6 +-
 .../cassandra/db/RecoveryManagerTest.java       |  12 +-
 .../db/RecoveryManagerTruncateTest.java         |   4 +-
 .../org/apache/cassandra/db/RemoveCellTest.java |   6 +-
 .../cassandra/db/RemoveColumnFamilyTest.java    |   6 +-
 .../db/RemoveColumnFamilyWithFlush1Test.java    |   6 +-
 .../db/RemoveColumnFamilyWithFlush2Test.java    |   6 +-
 .../apache/cassandra/db/RemoveSubCellTest.java  |  16 +-
 .../apache/cassandra/db/RowIterationTest.java   |   9 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   4 +-
 .../apache/cassandra/db/SerializationsTest.java |  48 +--
 .../org/apache/cassandra/db/TimeSortTest.java   |  12 +-
 .../compaction/BlacklistingCompactionsTest.java |   7 +-
 .../db/compaction/CompactionsPurgeTest.java     |  54 ++-
 .../db/compaction/CompactionsTest.java          |  20 +-
 .../LeveledCompactionStrategyTest.java          |  14 +-
 .../db/compaction/OneCompactionTest.java        |   8 +-
 .../SizeTieredCompactionStrategyTest.java       |  10 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |  25 +-
 .../db/index/PerRowSecondaryIndexTest.java      |  14 +-
 .../cassandra/db/marshal/CompositeTypeTest.java |   4 +-
 .../db/marshal/DynamicCompositeTypeTest.java    |   4 +-
 .../io/sstable/IndexSummaryManagerTest.java     |   6 +-
 .../io/sstable/SSTableMetadataTest.java         |  21 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |  20 +-
 .../io/sstable/SSTableScannerTest.java          |   2 +-
 .../service/AntiEntropyServiceCounterTest.java  |   2 +-
 .../service/AntiEntropyServiceStandardTest.java |   4 +-
 .../cassandra/service/QueryPagerTest.java       |   2 +-
 .../streaming/StreamingTransferTest.java        |   6 +-
 96 files changed, 1114 insertions(+), 1189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java
----------------------------------------------------------------------
diff --git a/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java b/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java
index 2e1d4cc..8ebc46e 100644
--- a/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java
+++ b/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java
@@ -24,12 +24,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.cassandra.db.Cell;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.util.FileUtils;
 
 public class InvertedIndex implements ITrigger
@@ -37,12 +37,12 @@ public class InvertedIndex implements ITrigger
     private static final Logger logger = LoggerFactory.getLogger(InvertedIndex.class);
     private Properties properties = loadProperties();
 
-    public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+    public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
     {
-        List<RowMutation> mutations = new ArrayList<>();
+        List<Mutation> mutations = new ArrayList<>();
         for (Cell cell : update)
         {
-            RowMutation mutation = new RowMutation(properties.getProperty("keyspace"), cell.value());
+            Mutation mutation = new Mutation(properties.getProperty("keyspace"), cell.value());
             mutation.add(properties.getProperty("columnfamily"), cell.name(), key, System.currentTimeMillis());
             mutations.add(mutation);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 097f150..ddc839c 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1022,7 +1022,7 @@ public final class CFMetaData
     /**
      * Create CFMetaData from thrift {@link CqlRow} that contains columns from schema_columnfamilies.
      *
-     * @param row CqlRow containing columns from schema_columnfamilies.
+     * @param cf CqlRow containing columns from schema_columnfamilies.
      * @return CFMetaData derived from CqlRow
      */
     public static CFMetaData fromThriftCqlRow(CqlRow cf, CqlResult columnsRes)
@@ -1476,11 +1476,11 @@ public final class CFMetaData
      *
      * @return Difference between attributes in form of schema mutation
      */
-    public RowMutation toSchemaUpdate(CFMetaData newState, long modificationTimestamp, boolean fromThrift)
+    public Mutation toSchemaUpdate(CFMetaData newState, long modificationTimestamp, boolean fromThrift)
     {
-        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
 
-        newState.toSchemaNoColumnsNoTriggers(rm, modificationTimestamp);
+        newState.toSchemaNoColumnsNoTriggers(mutation, modificationTimestamp);
 
         MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(columnMetadata, newState.columnMetadata);
 
@@ -1492,31 +1492,31 @@ public final class CFMetaData
             if (fromThrift && cd.kind != ColumnDefinition.Kind.REGULAR)
                 continue;
 
-            cd.deleteFromSchema(rm, modificationTimestamp);
+            cd.deleteFromSchema(mutation, modificationTimestamp);
         }
 
         // newly added columns
         for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
-            cd.toSchema(rm, modificationTimestamp);
+            cd.toSchema(mutation, modificationTimestamp);
 
         // old columns with updated attributes
         for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
         {
             ColumnDefinition cd = newState.columnMetadata.get(name);
-            cd.toSchema(rm, modificationTimestamp);
+            cd.toSchema(mutation, modificationTimestamp);
         }
 
         MapDifference<String, TriggerDefinition> triggerDiff = Maps.difference(triggers, newState.triggers);
 
         // dropped triggers
         for (TriggerDefinition td : triggerDiff.entriesOnlyOnLeft().values())
-            td.deleteFromSchema(rm, cfName, modificationTimestamp);
+            td.deleteFromSchema(mutation, cfName, modificationTimestamp);
 
         // newly created triggers
         for (TriggerDefinition td : triggerDiff.entriesOnlyOnRight().values())
-            td.toSchema(rm, cfName, modificationTimestamp);
+            td.toSchema(mutation, cfName, modificationTimestamp);
 
-        return rm;
+        return mutation;
     }
 
     /**
@@ -1524,24 +1524,24 @@ public final class CFMetaData
      *
      * @param timestamp Timestamp to use
      *
-     * @return RowMutation to use to completely remove cf from schema
+     * @return Mutation to use to completely remove cf from schema
      */
-    public RowMutation dropFromSchema(long timestamp)
+    public Mutation dropFromSchema(long timestamp)
     {
-        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
-        ColumnFamily cf = rm.addOrGet(SchemaColumnFamiliesCf);
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
+        ColumnFamily cf = mutation.addOrGet(SchemaColumnFamiliesCf);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
         Composite prefix = SchemaColumnFamiliesCf.comparator.make(cfName);
         cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
 
         for (ColumnDefinition cd : allColumns())
-            cd.deleteFromSchema(rm, timestamp);
+            cd.deleteFromSchema(mutation, timestamp);
 
         for (TriggerDefinition td : triggers.values())
-            td.deleteFromSchema(rm, cfName, timestamp);
+            td.deleteFromSchema(mutation, cfName, timestamp);
 
-        return rm;
+        return mutation;
     }
 
     public boolean isPurged()
@@ -1554,19 +1554,19 @@ public final class CFMetaData
         isPurged = true;
     }
 
-    public void toSchema(RowMutation rm, long timestamp)
+    public void toSchema(Mutation mutation, long timestamp)
     {
-        toSchemaNoColumnsNoTriggers(rm, timestamp);
+        toSchemaNoColumnsNoTriggers(mutation, timestamp);
 
         for (ColumnDefinition cd : allColumns())
-            cd.toSchema(rm, timestamp);
+            cd.toSchema(mutation, timestamp);
     }
 
-    private void toSchemaNoColumnsNoTriggers(RowMutation rm, long timestamp)
+    private void toSchemaNoColumnsNoTriggers(Mutation mutation, long timestamp)
     {
         // For property that can be null (and can be changed), we insert tombstones, to make sure
         // we don't keep a property the user has removed
-        ColumnFamily cf = rm.addOrGet(SchemaColumnFamiliesCf);
+        ColumnFamily cf = mutation.addOrGet(SchemaColumnFamiliesCf);
         Composite prefix = SchemaColumnFamiliesCf.comparator.make(cfName);
         CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
 
@@ -1790,11 +1790,11 @@ public final class CFMetaData
      *
      * @throws ConfigurationException if any of the attributes didn't pass validation
      */
-    public RowMutation toSchema(long timestamp) throws ConfigurationException
+    public Mutation toSchema(long timestamp) throws ConfigurationException
     {
-        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
-        toSchema(rm, timestamp);
-        return rm;
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
+        toSchema(mutation, timestamp);
+        return mutation;
     }
 
     // The comparator to validate the definition name.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index f16a0a6..79f50bf 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -279,14 +279,14 @@ public class ColumnDefinition extends ColumnSpecification
     }
 
     /**
-     * Drop specified column from the schema using given row.
+     * Drop specified column from the schema using given mutation.
      *
-     * @param rm         The schema row mutation
-     * @param timestamp  The timestamp to use for column modification
+     * @param mutation  The schema mutation
+     * @param timestamp The timestamp to use for column modification
      */
-    public void deleteFromSchema(RowMutation rm, long timestamp)
+    public void deleteFromSchema(Mutation mutation, long timestamp)
     {
-        ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaColumnsCf);
+        ColumnFamily cf = mutation.addOrGet(CFMetaData.SchemaColumnsCf);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
         // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
@@ -294,9 +294,9 @@ public class ColumnDefinition extends ColumnSpecification
         cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
     }
 
-    public void toSchema(RowMutation rm, long timestamp)
+    public void toSchema(Mutation mutation, long timestamp)
     {
-        ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaColumnsCf);
+        ColumnFamily cf = mutation.addOrGet(CFMetaData.SchemaColumnsCf);
         Composite prefix = CFMetaData.SchemaColumnsCf.comparator.make(cfName, name.toString());
         CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index 3dfea03..c3fe641 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -194,7 +194,7 @@ public final class KSMetaData
         return ksdef;
     }
 
-    public RowMutation toSchemaUpdate(KSMetaData newState, long modificationTimestamp)
+    public Mutation toSchemaUpdate(KSMetaData newState, long modificationTimestamp)
     {
         return newState.toSchema(modificationTimestamp);
     }
@@ -226,21 +226,22 @@ public final class KSMetaData
         return fromSchema(ksDefRow, Collections.<CFMetaData>emptyList());
     }
 
-    public RowMutation dropFromSchema(long timestamp)
+    public Mutation dropFromSchema(long timestamp)
     {
-        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(name));
-        rm.delete(SystemKeyspace.SCHEMA_KEYSPACES_CF, timestamp);
-        rm.delete(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, timestamp);
-        rm.delete(SystemKeyspace.SCHEMA_COLUMNS_CF, timestamp);
-        rm.delete(SystemKeyspace.SCHEMA_TRIGGERS_CF, timestamp);
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(name));
 
-        return rm;
+        mutation.delete(SystemKeyspace.SCHEMA_KEYSPACES_CF, timestamp);
+        mutation.delete(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, timestamp);
+        mutation.delete(SystemKeyspace.SCHEMA_COLUMNS_CF, timestamp);
+        mutation.delete(SystemKeyspace.SCHEMA_TRIGGERS_CF, timestamp);
+
+        return mutation;
     }
 
-    public RowMutation toSchema(long timestamp)
+    public Mutation toSchema(long timestamp)
     {
-        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(name));
-        ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaKeyspacesCf);
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(name));
+        ColumnFamily cf = mutation.addOrGet(CFMetaData.SchemaKeyspacesCf);
         CFRowAdder adder = new CFRowAdder(cf, CFMetaData.SchemaKeyspacesCf.comparator.builder().build(), timestamp);
 
         adder.add("durable_writes", durableWrites);
@@ -248,9 +249,9 @@ public final class KSMetaData
         adder.add("strategy_options", json(strategyOptions));
 
         for (CFMetaData cfm : cfMetaData.values())
-            cfm.toSchema(rm, timestamp);
+            cfm.toSchema(mutation, timestamp);
 
-        return rm;
+        return mutation;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/config/TriggerDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/TriggerDefinition.java b/src/java/org/apache/cassandra/config/TriggerDefinition.java
index e1809eb..aaaf631 100644
--- a/src/java/org/apache/cassandra/config/TriggerDefinition.java
+++ b/src/java/org/apache/cassandra/config/TriggerDefinition.java
@@ -72,15 +72,15 @@ public class TriggerDefinition
     }
 
     /**
-     * Add specified trigger to the schema using given row.
+     * Add specified trigger to the schema using given mutation.
      *
-     * @param rm        The schema row mutation
+     * @param mutation  The schema mutation
      * @param cfName    The name of the parent ColumnFamily
      * @param timestamp The timestamp to use for the columns
      */
-    public void toSchema(RowMutation rm, String cfName, long timestamp)
+    public void toSchema(Mutation mutation, String cfName, long timestamp)
     {
-        ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
+        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
 
         CFMetaData cfm = CFMetaData.SchemaTriggersCf;
         Composite prefix = cfm.comparator.make(cfName, name);
@@ -90,15 +90,15 @@ public class TriggerDefinition
     }
 
     /**
-     * Drop specified trigger from the schema using given row.
+     * Drop specified trigger from the schema using given mutation.
      *
-     * @param rm        The schema row mutation
+     * @param mutation  The schema mutation
      * @param cfName    The name of the parent ColumnFamily
      * @param timestamp The timestamp to use for the tombstone
      */
-    public void deleteFromSchema(RowMutation rm, String cfName, long timestamp)
+    public void deleteFromSchema(Mutation mutation, String cfName, long timestamp)
     {
-        ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
+        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
         Composite prefix = CFMetaData.SchemaTriggersCf.comparator.make(cfName, name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/config/UTMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/UTMetaData.java b/src/java/org/apache/cassandra/config/UTMetaData.java
index f78e645..76f3999 100644
--- a/src/java/org/apache/cassandra/config/UTMetaData.java
+++ b/src/java/org/apache/cassandra/config/UTMetaData.java
@@ -83,10 +83,10 @@ public final class UTMetaData
         return fromSchema(result);
     }
 
-    public static RowMutation toSchema(UserType newType, long timestamp)
+    public static Mutation toSchema(UserType newType, long timestamp)
     {
-        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, newType.name);
-        ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_CF);
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, newType.name);
+        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_CF);
 
         CFMetaData cfm = CFMetaData.SchemaUserTypesCf;
         UpdateParameters params = new UpdateParameters(cfm, Collections.<ByteBuffer>emptyList(), timestamp, 0, null);
@@ -106,14 +106,14 @@ public final class UTMetaData
             throw new AssertionError();
         }
 
-        return rm;
+        return mutation;
     }
 
-    public static RowMutation dropFromSchema(UserType droppedType, long timestamp)
+    public static Mutation dropFromSchema(UserType droppedType, long timestamp)
     {
-        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, droppedType.name);
-        rm.delete(SystemKeyspace.SCHEMA_USER_TYPES_CF, timestamp);
-        return rm;
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, droppedType.name);
+        mutation.delete(SystemKeyspace.SCHEMA_USER_TYPES_CF, timestamp);
+        return mutation;
     }
 
     public void addAll(UTMetaData types)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/cql/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/DeleteStatement.java b/src/java/org/apache/cassandra/cql/DeleteStatement.java
index bcc63e1..71942e4 100644
--- a/src/java/org/apache/cassandra/cql/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql/DeleteStatement.java
@@ -24,9 +24,9 @@ import java.util.List;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
@@ -76,27 +76,25 @@ public class DeleteStatement extends AbstractModification
         clientState.hasColumnFamilyAccess(keyspace, columnFamily, Permission.MODIFY);
         AbstractType<?> keyType = Schema.instance.getCFMetaData(keyspace, columnFamily).getKeyValidator();
 
-        List<IMutation> rowMutations = new ArrayList<IMutation>(keys.size());
+        List<IMutation> mutations = new ArrayList<IMutation>(keys.size());
 
         for (Term key : keys)
-        {
-            rowMutations.add(mutationForKey(key.getByteBuffer(keyType, variables), keyspace, timestamp, clientState, variables, metadata));
-        }
+            mutations.add(mutationForKey(key.getByteBuffer(keyType, variables), keyspace, timestamp, clientState, variables, metadata));
 
-        return rowMutations;
+        return mutations;
     }
 
-    public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp, ThriftClientState clientState, List<ByteBuffer> variables, CFMetaData metadata)
+    public Mutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp, ThriftClientState clientState, List<ByteBuffer> variables, CFMetaData metadata)
     throws InvalidRequestException
     {
-        RowMutation rm = new RowMutation(keyspace, key);
+        Mutation mutation = new Mutation(keyspace, key);
 
         QueryProcessor.validateKeyAlias(metadata, keyName);
 
         if (columns.size() < 1)
         {
-            // No columns, delete the row
-            rm.delete(columnFamily, (timestamp == null) ? getTimestamp(clientState) : timestamp);
+            // No columns, delete the partition
+            mutation.delete(columnFamily, (timestamp == null) ? getTimestamp(clientState) : timestamp);
         }
         else
         {
@@ -106,11 +104,11 @@ public class DeleteStatement extends AbstractModification
             {
                 CellName columnName = metadata.comparator.cellFromByteBuffer(column.getByteBuffer(at, variables));
                 validateColumnName(columnName);
-                rm.delete(columnFamily, columnName, (timestamp == null) ? getTimestamp(clientState) : timestamp);
+                mutation.delete(columnFamily, columnName, (timestamp == null) ? getTimestamp(clientState) : timestamp);
             }
         }
 
-        return rm;
+        return mutation;
     }
 
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/cql/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/UpdateStatement.java b/src/java/org/apache/cassandra/cql/UpdateStatement.java
index 446c5a2..16a0d76 100644
--- a/src/java/org/apache/cassandra/cql/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql/UpdateStatement.java
@@ -23,12 +23,9 @@ import java.util.*;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.CounterMutation;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
@@ -151,18 +148,16 @@ public class UpdateStatement extends AbstractModification
 
         clientState.hasColumnFamilyAccess(keyspace, columnFamily, Permission.MODIFY);
 
-        List<IMutation> rowMutations = new LinkedList<IMutation>();
+        List<IMutation> mutations = new LinkedList<>();
 
         for (Term key: keys)
-        {
-            rowMutations.add(mutationForKey(keyspace, key.getByteBuffer(getKeyType(keyspace),variables), metadata, timestamp, clientState, variables));
-        }
+            mutations.add(mutationForKey(keyspace, key.getByteBuffer(getKeyType(keyspace),variables), metadata, timestamp, clientState, variables));
 
-        return rowMutations;
+        return mutations;
     }
 
     /**
-     * Compute a row mutation for a single key
+     * Compute a mutation for a single key
      *
      *
      * @param keyspace working keyspace
@@ -171,7 +166,7 @@ public class UpdateStatement extends AbstractModification
      * @param timestamp global timestamp to use for every key mutation
      *
      * @param clientState
-     * @return row mutation
+     * @return mutation
      *
      * @throws InvalidRequestException on the wrong request
      */
@@ -182,9 +177,9 @@ public class UpdateStatement extends AbstractModification
         CellNameType comparator = metadata.comparator;
         AbstractType<?> at = comparator.asAbstractType();
 
-        // if true we need to wrap RowMutation into CounterMutation
+        // if true we need to wrap Mutation into CounterMutation
         boolean hasCounterColumn = false;
-        RowMutation rm = new RowMutation(keyspace, key);
+        Mutation mutation = new Mutation(keyspace, key);
 
         for (Map.Entry<Term, Operation> column : getColumns().entrySet())
         {
@@ -199,11 +194,11 @@ public class UpdateStatement extends AbstractModification
                 ByteBuffer colValue = op.a.getByteBuffer(metadata.getValueValidator(colName),variables);
 
                 validateColumn(metadata, colName, colValue);
-                rm.add(columnFamily,
-                       colName,
-                       colValue,
-                       (timestamp == null) ? getTimestamp(clientState) : timestamp,
-                       getTimeToLive());
+                mutation.add(columnFamily,
+                             colName,
+                             colValue,
+                             (timestamp == null) ? getTimestamp(clientState) : timestamp,
+                             getTimeToLive());
             }
             else
             {
@@ -224,11 +219,11 @@ public class UpdateStatement extends AbstractModification
                                                       op.b.getText()));
                 }
 
-                rm.addCounter(columnFamily, colName, value);
+                mutation.addCounter(columnFamily, colName, value);
             }
         }
 
-        return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm;
+        return (hasCounterColumn) ? new CounterMutation(mutation, getConsistencyLevel()) : mutation;
     }
 
     public String getColumnFamily()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index a396ef4..d164816 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -516,18 +516,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
 
     private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch)
     {
-        RowMutation rm;
+        Mutation mutation;
         if (isBatch)
         {
             // we might group other mutations together with this one later, so make it mutable
-            rm = new RowMutation(cfm.ksName, key);
-            rm.add(cf);
+            mutation = new Mutation(cfm.ksName, key);
+            mutation.add(cf);
         }
         else
         {
-            rm = new RowMutation(cfm.ksName, key, cf);
+            mutation = new Mutation(cfm.ksName, key, cf);
         }
-        return isCounter() ? new CounterMutation(rm, cl) : rm;
+        return isCounter() ? new CounterMutation(mutation, cl) : mutation;
     }
 
     private ColumnFamily buildConditions(ByteBuffer key, Composite clusteringPrefix, UpdateParameters params)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 24fb0c5..b103b69 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -35,7 +35,6 @@ import javax.management.ObjectName;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import org.apache.cassandra.db.composites.CellName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +43,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UUIDType;
@@ -121,21 +121,21 @@ public class BatchlogManager implements BatchlogManagerMBean
         batchlogTasks.execute(runnable);
     }
 
-    public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid)
+    public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid)
     {
         long timestamp = FBUtilities.timestampMicros();
         ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
-        ByteBuffer data = serializeRowMutations(mutations);
+        ByteBuffer data = serializeMutations(mutations);
 
         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
         cf.addColumn(new Cell(cellName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
         cf.addColumn(new Cell(cellName("data"), data, timestamp));
         cf.addColumn(new Cell(cellName("written_at"), writtenAt, timestamp));
 
-        return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
+        return new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
     }
 
-    private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutations)
+    private static ByteBuffer serializeMutations(Collection<Mutation> mutations)
     {
         FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream out = new DataOutputStream(bos);
@@ -143,8 +143,8 @@ public class BatchlogManager implements BatchlogManagerMBean
         try
         {
             out.writeInt(mutations.size());
-            for (RowMutation rm : mutations)
-                RowMutation.serializer.serialize(rm, out, VERSION);
+            for (Mutation mutation : mutations)
+                Mutation.serializer.serialize(mutation, out, VERSION);
         }
         catch (IOException e)
         {
@@ -204,14 +204,14 @@ public class BatchlogManager implements BatchlogManagerMBean
         DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
         int size = in.readInt();
         for (int i = 0; i < size; i++)
-            replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt);
+            replaySerializedMutation(Mutation.serializer.deserialize(in, VERSION), writtenAt);
     }
 
     /*
      * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
      * when a replica is down or a write request times out.
      */
-    private void replaySerializedMutation(RowMutation mutation, long writtenAt)
+    private void replaySerializedMutation(Mutation mutation, long writtenAt)
     {
         int ttl = calculateHintTTL(mutation, writtenAt);
         if (ttl <= 0)
@@ -235,7 +235,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
     }
 
-    private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints)
+    private void attemptDirectDelivery(Mutation mutation, long writtenAt, Set<InetAddress> endpoints)
     {
         List<WriteResponseHandler> handlers = Lists.newArrayList();
         final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints);
@@ -277,7 +277,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog).
     // this ensures that deletes aren't "undone" by an old batch replay.
-    private int calculateHintTTL(RowMutation mutation, long writtenAt)
+    private int calculateHintTTL(Mutation mutation, long writtenAt)
     {
         return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index c7c09bf..1f164a9 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -149,9 +149,9 @@ public class CollationController
                 && cfs.getCompactionStrategy() instanceof SizeTieredCompactionStrategy)
             {
                 Tracing.trace("Defragmenting requested data");
-                RowMutation rm = new RowMutation(cfs.keyspace.getName(), filter.key.key, returnCF.cloneMe());
+                Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.key, returnCF.cloneMe());
                 // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
-                Keyspace.open(rm.getKeyspaceName()).apply(rm, false, false);
+                Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
             }
 
             // Caller is responsible for final removeDeletedCF.  This is important for cacheRow to work correctly:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
index 0391eb8..297ec73 100644
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -349,12 +349,12 @@ public class CounterCell extends Cell
 
     private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException
     {
-        RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf);
+        Mutation mutation = new Mutation(cf.metadata().ksName, key.key, cf);
 
         final InetAddress local = FBUtilities.getBroadcastAddress();
         String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
 
-        StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
+        StorageProxy.performWrite(mutation, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
         {
             public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
             throws OverloadedException
@@ -363,7 +363,7 @@ public class CounterCell extends Cell
                 Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local));
                 // Fake local response to be a good lad but we won't wait on the responseHandler
                 responseHandler.response(null);
-                StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler, localDataCenter);
+                StorageProxy.sendToHintedEndpoints((Mutation) mutation, remotes, responseHandler, localDataCenter);
             }
         }, null, WriteType.SIMPLE);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index f0942e2..a07dd9b 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -42,38 +42,33 @@ public class CounterMutation implements IMutation
 {
     public static final CounterMutationSerializer serializer = new CounterMutationSerializer();
 
-    private final RowMutation rowMutation;
+    private final Mutation mutation;
     private final ConsistencyLevel consistency;
 
-    public CounterMutation(RowMutation rowMutation, ConsistencyLevel consistency)
+    public CounterMutation(Mutation mutation, ConsistencyLevel consistency)
     {
-        this.rowMutation = rowMutation;
+        this.mutation = mutation;
         this.consistency = consistency;
     }
 
     public String getKeyspaceName()
     {
-        return rowMutation.getKeyspaceName();
+        return mutation.getKeyspaceName();
     }
 
     public Collection<UUID> getColumnFamilyIds()
     {
-        return rowMutation.getColumnFamilyIds();
+        return mutation.getColumnFamilyIds();
     }
 
     public Collection<ColumnFamily> getColumnFamilies()
     {
-        return rowMutation.getColumnFamilies();
+        return mutation.getColumnFamilies();
     }
 
     public ByteBuffer key()
     {
-        return rowMutation.key();
-    }
-
-    public RowMutation rowMutation()
-    {
-        return rowMutation;
+        return mutation.key();
     }
 
     public ConsistencyLevel consistency()
@@ -81,19 +76,19 @@ public class CounterMutation implements IMutation
         return consistency;
     }
 
-    public RowMutation makeReplicationMutation()
+    public Mutation makeReplicationMutation()
     {
         List<ReadCommand> readCommands = new LinkedList<ReadCommand>();
         long timestamp = System.currentTimeMillis();
-        for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+        for (ColumnFamily columnFamily : mutation.getColumnFamilies())
         {
             if (!columnFamily.metadata().getReplicateOnWrite())
                 continue;
-            addReadCommandFromColumnFamily(rowMutation.getKeyspaceName(), rowMutation.key(), columnFamily, timestamp, readCommands);
+            addReadCommandFromColumnFamily(mutation.getKeyspaceName(), mutation.key(), columnFamily, timestamp, readCommands);
         }
 
-        // create a replication RowMutation
-        RowMutation replicationMutation = new RowMutation(rowMutation.getKeyspaceName(), rowMutation.key());
+        // create a replication Mutation
+        Mutation replicationMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
         for (ReadCommand readCommand : readCommands)
         {
             Keyspace keyspace = Keyspace.open(readCommand.ksName);
@@ -121,7 +116,7 @@ public class CounterMutation implements IMutation
 
     public boolean shouldReplicateOnWrite()
     {
-        for (ColumnFamily cf : rowMutation.getColumnFamilies())
+        for (ColumnFamily cf : mutation.getColumnFamilies())
             if (cf.metadata().getReplicateOnWrite())
                 return true;
         return false;
@@ -130,10 +125,10 @@ public class CounterMutation implements IMutation
     public void apply()
     {
         // transform all CounterUpdateCell to CounterCell: accomplished by localCopy
-        RowMutation rm = new RowMutation(rowMutation.getKeyspaceName(), ByteBufferUtil.clone(rowMutation.key()));
-        Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
+        Mutation m = new Mutation(mutation.getKeyspaceName(), ByteBufferUtil.clone(mutation.key()));
+        Keyspace keyspace = Keyspace.open(m.getKeyspaceName());
 
-        for (ColumnFamily cf_ : rowMutation.getColumnFamilies())
+        for (ColumnFamily cf_ : mutation.getColumnFamilies())
         {
             ColumnFamily cf = cf_.cloneMeShallow();
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
@@ -141,9 +136,9 @@ public class CounterMutation implements IMutation
             {
                 cf.addColumn(cell.localCopy(cfs), HeapAllocator.instance);
             }
-            rm.add(cf);
+            m.add(cf);
         }
-        rm.apply();
+        m.apply();
     }
 
     public void addAll(IMutation m)
@@ -152,7 +147,7 @@ public class CounterMutation implements IMutation
             throw new IllegalArgumentException();
 
         CounterMutation cm = (CounterMutation)m;
-        rowMutation.addAll(cm.rowMutation);
+        mutation.addAll(cm.mutation);
     }
 
     @Override
@@ -164,30 +159,30 @@ public class CounterMutation implements IMutation
     public String toString(boolean shallow)
     {
         StringBuilder buff = new StringBuilder("CounterMutation(");
-        buff.append(rowMutation.toString(shallow));
+        buff.append(mutation.toString(shallow));
         buff.append(", ").append(consistency.toString());
         return buff.append(")").toString();
     }
-}
 
-class CounterMutationSerializer implements IVersionedSerializer<CounterMutation>
-{
-    public void serialize(CounterMutation cm, DataOutput out, int version) throws IOException
+    public static class CounterMutationSerializer implements IVersionedSerializer<CounterMutation>
     {
-        RowMutation.serializer.serialize(cm.rowMutation(), out, version);
-        out.writeUTF(cm.consistency().name());
-    }
+        public void serialize(CounterMutation cm, DataOutput out, int version) throws IOException
+        {
+            Mutation.serializer.serialize(cm.mutation, out, version);
+            out.writeUTF(cm.consistency.name());
+        }
 
-    public CounterMutation deserialize(DataInput in, int version) throws IOException
-    {
-        RowMutation rm = RowMutation.serializer.deserialize(in, version);
-        ConsistencyLevel consistency = Enum.valueOf(ConsistencyLevel.class, in.readUTF());
-        return new CounterMutation(rm, consistency);
-    }
+        public CounterMutation deserialize(DataInput in, int version) throws IOException
+        {
+            Mutation m = Mutation.serializer.deserialize(in, version);
+            ConsistencyLevel consistency = Enum.valueOf(ConsistencyLevel.class, in.readUTF());
+            return new CounterMutation(m, consistency);
+        }
 
-    public long serializedSize(CounterMutation cm, int version)
-    {
-        return RowMutation.serializer.serializedSize(cm.rowMutation(), version)
-             + TypeSizes.NATIVE.sizeof(cm.consistency().name());
+        public long serializedSize(CounterMutation cm, int version)
+        {
+            return Mutation.serializer.serializedSize(cm.mutation, version)
+                    + TypeSizes.NATIVE.sizeof(cm.consistency.name());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index cab094d..5cb62ed 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -32,13 +32,13 @@ import org.apache.cassandra.utils.WrappedRunnable;
  * Called when node receives updated schema state from the schema migration coordinator node.
  * Such happens when user makes local schema migration on one of the nodes in the ring
  * (which is going to act as coordinator) and that node sends (pushes) it's updated schema state
- * (in form of row mutations) to all the alive nodes in the cluster.
+ * (in form of mutations) to all the alive nodes in the cluster.
  */
-public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<RowMutation>>
+public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<Mutation>>
 {
     private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
 
-    public void doVerb(final MessageIn<Collection<RowMutation>> message, int id)
+    public void doVerb(final MessageIn<Collection<Mutation>> message, int id)
     {
         logger.debug("Received schema mutation push from {}", message.from);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 693ef97..a9e1c74 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -37,11 +37,9 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.UserType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * SCHEMA_{KEYSPACES, COLUMNFAMILIES, COLUMNS}_CF are used to store Keyspace/ColumnFamily attributes to make schema
@@ -145,7 +143,7 @@ public class DefsTables
     }
 
     /**
-     * Merge remote schema in form of row mutations with local and mutate ks/cf metadata objects
+     * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
      * (which also involves fs operations on add/drop ks/cf)
      *
      * @param mutations the schema changes to apply
@@ -153,14 +151,14 @@ public class DefsTables
      * @throws ConfigurationException If one of metadata attributes has invalid value
      * @throws IOException If data was corrupted during transportation or failed to apply fs operations
      */
-    public static synchronized void mergeSchema(Collection<RowMutation> mutations) throws ConfigurationException, IOException
+    public static synchronized void mergeSchema(Collection<Mutation> mutations) throws ConfigurationException, IOException
     {
         // current state of the schema
         Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
         Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
         List<Row> oldTypes = SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF);
 
-        for (RowMutation mutation : mutations)
+        for (Mutation mutation : mutations)
             mutation.apply();
 
         if (!StorageService.instance.isClientMode())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 9b470e4..51a1822 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -121,7 +121,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
      * Returns a mutation representing a Hint to be sent to <code>targetId</code>
      * as soon as it becomes available again.
      */
-    public RowMutation hintFor(RowMutation mutation, int ttl, UUID targetId)
+    public Mutation hintFor(Mutation mutation, int ttl, UUID targetId)
     {
         assert ttl > 0;
 
@@ -135,18 +135,18 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         UUID hintId = UUIDGen.getTimeUUID();
         // serialize the hint with id and version as a composite column name
         CellName name = CFMetaData.HintsCf.comparator.makeCellName(hintId, MessagingService.current_version);
-        ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, MessagingService.current_version));
+        ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.HINTS_CF));
         cf.addColumn(name, value, System.currentTimeMillis(), ttl);
-        return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
+        return new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
     }
 
     /*
-     * determine the TTL for the hint RowMutation
+     * determine the TTL for the hint Mutation
      * this is set at the smallest GCGraceSeconds for any of the CFs in the RM
      * this ensures that deletes aren't "undone" by delivery of an old hint
      */
-    public static int calculateHintTTL(RowMutation mutation)
+    public static int calculateHintTTL(Mutation mutation)
     {
         int ttl = maxHintTTL;
         for (ColumnFamily cf : mutation.getColumnFamilies())
@@ -181,9 +181,9 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
     private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
     {
-        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, tokenBytes);
-        rm.delete(SystemKeyspace.HINTS_CF, columnName, timestamp);
-        rm.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, tokenBytes);
+        mutation.delete(SystemKeyspace.HINTS_CF, columnName, timestamp);
+        mutation.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
     }
 
     public void deleteHintsForEndpoint(final String ipOrHostname)
@@ -206,8 +206,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             return;
         UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
         ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
-        final RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, hostIdBytes);
-        rm.delete(SystemKeyspace.HINTS_CF, System.currentTimeMillis());
+        final Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, hostIdBytes);
+        mutation.delete(SystemKeyspace.HINTS_CF, System.currentTimeMillis());
 
         // execute asynchronously to avoid blocking caller (which may be processing gossip)
         Runnable runnable = new Runnable()
@@ -217,7 +217,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 try
                 {
                     logger.info("Deleting any stored hints for {}", endpoint);
-                    rm.apply();
+                    mutation.apply();
                     compact();
                 }
                 catch (Exception e)
@@ -384,10 +384,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
                 int version = Int32Type.instance.compose(hint.name().get(1));
                 DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(hint.value()));
-                RowMutation rm;
+                Mutation mutation;
                 try
                 {
-                    rm = RowMutation.serializer.deserialize(in, version);
+                    mutation = Mutation.serializer.deserialize(in, version);
                 }
                 catch (UnknownColumnFamilyException e)
                 {
@@ -401,12 +401,12 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 }
 
                 truncationTimesCache.clear();
-                for (UUID cfId : ImmutableSet.copyOf((rm.getColumnFamilyIds())))
+                for (UUID cfId : ImmutableSet.copyOf((mutation.getColumnFamilyIds())))
                 {
                     Long truncatedAt = truncationTimesCache.get(cfId);
                     if (truncatedAt == null)
                     {
-                        ColumnFamilyStore cfs = Keyspace.open(rm.getKeyspaceName()).getColumnFamilyStore(cfId);
+                        ColumnFamilyStore cfs = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(cfId);
                         truncatedAt = cfs.getTruncationTime();
                         truncationTimesCache.put(cfId, truncatedAt);
                     }
@@ -414,17 +414,17 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                     if (hint.maxTimestamp() < truncatedAt)
                     {
                         logger.debug("Skipping delivery of hint for truncated columnfamily {}", cfId);
-                        rm = rm.without(cfId);
+                        mutation = mutation.without(cfId);
                     }
                 }
 
-                if (rm.isEmpty())
+                if (mutation.isEmpty())
                 {
                     deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                     continue;
                 }
 
-                MessageOut<RowMutation> message = rm.createMessage();
+                MessageOut<Mutation> message = mutation.createMessage();
                 rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
                 Runnable callback = new Runnable()
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 14bcca1..c380e45 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -328,7 +328,7 @@ public class Keyspace
         return new Row(filter.key, columnFamily);
     }
 
-    public void apply(RowMutation mutation, boolean writeCommitLog)
+    public void apply(Mutation mutation, boolean writeCommitLog)
     {
         apply(mutation, writeCommitLog, true);
     }
@@ -341,7 +341,7 @@ public class Keyspace
      * @param writeCommitLog false to disable commitlog append entirely
      * @param updateIndexes  false to disable index updates (used by CollationController "defragmenting")
      */
-    public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes)
+    public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
         // write the mutation to the commitlog and memtables
         Tracing.trace("Acquiring switchLock read lock");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
index 31a64a9..d4503ba 100644
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.MigrationManager;
 
 /**
- * Sends it's current schema state in form of row mutations in reply to the remote node's request.
+ * Sends it's current schema state in form of mutations in reply to the remote node's request.
  * Such a request is made when one of the nodes, by means of Gossip, detects schema disagreement in the ring.
  */
 public class MigrationRequestVerbHandler implements IVerbHandler
@@ -39,9 +39,9 @@ public class MigrationRequestVerbHandler implements IVerbHandler
     public void doVerb(MessageIn message, int id)
     {
         logger.debug("Received migration request from {}.", message.from);
-        MessageOut<Collection<RowMutation>> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
-                                                                        SystemKeyspace.serializeSchema(),
-                                                                        MigrationManager.MigrationsSerializer.instance);
+        MessageOut<Collection<Mutation>> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
+                                                                     SystemKeyspace.serializeSchema(),
+                                                                     MigrationManager.MigrationsSerializer.instance);
         MessagingService.instance().sendReply(response, id, message.from);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
new file mode 100644
index 0000000..d70d7f9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+// TODO convert this to a Builder pattern instead of encouraging M.add directly,
+// which is less-efficient since we have to keep a mutable HashMap around
+public class Mutation implements IMutation
+{
+    public static final MutationSerializer serializer = new MutationSerializer();
+
+    public static final String FORWARD_TO = "FWD_TO";
+    public static final String FORWARD_FROM = "FWD_FRM";
+
+    // todo this is redundant
+    // when we remove it, also restore SerializationsTest.testMutationRead to not regenerate new Mutations each test
+    private final String keyspaceName;
+
+    private final ByteBuffer key;
+    // map of column family id to mutations for that column family.
+    private final Map<UUID, ColumnFamily> modifications;
+
+    public Mutation(String keyspaceName, ByteBuffer key)
+    {
+        this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
+    }
+
+    public Mutation(String keyspaceName, ByteBuffer key, ColumnFamily cf)
+    {
+        this(keyspaceName, key, Collections.singletonMap(cf.id(), cf));
+    }
+
+    public Mutation(String keyspaceName, Row row)
+    {
+        this(keyspaceName, row.key.key, row.cf);
+    }
+
+    protected Mutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
+    {
+        this.keyspaceName = keyspaceName;
+        this.key = key;
+        this.modifications = modifications;
+    }
+
+    public Mutation(ByteBuffer key, ColumnFamily cf)
+    {
+        this(cf.metadata().ksName, key, cf);
+    }
+
+    public String getKeyspaceName()
+    {
+        return keyspaceName;
+    }
+
+    public Collection<UUID> getColumnFamilyIds()
+    {
+        return modifications.keySet();
+    }
+
+    public ByteBuffer key()
+    {
+        return key;
+    }
+
+    public Collection<ColumnFamily> getColumnFamilies()
+    {
+        return modifications.values();
+    }
+
+    public ColumnFamily getColumnFamily(UUID cfId)
+    {
+        return modifications.get(cfId);
+    }
+
+    /*
+     * Specify a column family name and the corresponding column
+     * family object.
+     * param @ cf - column family name
+     * param @ columnFamily - the column family.
+     */
+    public void add(ColumnFamily columnFamily)
+    {
+        assert columnFamily != null;
+        ColumnFamily prev = modifications.put(columnFamily.id(), columnFamily);
+        if (prev != null)
+            // developer error
+            throw new IllegalArgumentException("ColumnFamily " + columnFamily + " already has modifications in this mutation: " + prev);
+    }
+
+    /**
+     * @return the ColumnFamily in this Mutation corresponding to @param cfName, creating an empty one if necessary.
+     */
+    public ColumnFamily addOrGet(String cfName)
+    {
+        return addOrGet(Schema.instance.getCFMetaData(keyspaceName, cfName));
+    }
+
+    public ColumnFamily addOrGet(CFMetaData cfm)
+    {
+        ColumnFamily cf = modifications.get(cfm.cfId);
+        if (cf == null)
+        {
+            cf = TreeMapBackedSortedColumns.factory.create(cfm);
+            modifications.put(cfm.cfId, cf);
+        }
+        return cf;
+    }
+
+    public boolean isEmpty()
+    {
+        return modifications.isEmpty();
+    }
+
+    public void add(String cfName, CellName name, ByteBuffer value, long timestamp, int timeToLive)
+    {
+        addOrGet(cfName).addColumn(name, value, timestamp, timeToLive);
+    }
+
+    public void addCounter(String cfName, CellName name, long value)
+    {
+        addOrGet(cfName).addCounter(name, value);
+    }
+
+    public void add(String cfName, CellName name, ByteBuffer value, long timestamp)
+    {
+        add(cfName, name, value, timestamp, 0);
+    }
+
+    public void delete(String cfName, long timestamp)
+    {
+        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+        addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime));
+    }
+
+    public void delete(String cfName, CellName name, long timestamp)
+    {
+        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+        addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp);
+    }
+
+    public void deleteRange(String cfName, Composite start, Composite end, long timestamp)
+    {
+        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+        addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime));
+    }
+
+    public void addAll(IMutation m)
+    {
+        if (!(m instanceof Mutation))
+            throw new IllegalArgumentException();
+
+        Mutation mutation = (Mutation)m;
+        if (!keyspaceName.equals(mutation.keyspaceName) || !key.equals(mutation.key))
+            throw new IllegalArgumentException();
+
+        for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
+        {
+            // It's slighty faster to assume the key wasn't present and fix if
+            // not in the case where it wasn't there indeed.
+            ColumnFamily cf = modifications.put(entry.getKey(), entry.getValue());
+            if (cf != null)
+                entry.getValue().resolve(cf);
+        }
+    }
+
+    /*
+     * This is equivalent to calling commit. Applies the changes to
+     * to the keyspace that is obtained by calling Keyspace.open().
+     */
+    public void apply()
+    {
+        Keyspace ks = Keyspace.open(keyspaceName);
+        ks.apply(this, ks.metadata.durableWrites);
+    }
+
+    public void applyUnsafe()
+    {
+        Keyspace.open(keyspaceName).apply(this, false);
+    }
+
+    public MessageOut<Mutation> createMessage()
+    {
+        return createMessage(MessagingService.Verb.MUTATION);
+    }
+
+    public MessageOut<Mutation> createMessage(MessagingService.Verb verb)
+    {
+        return new MessageOut<>(verb, this, serializer);
+    }
+
+    public String toString()
+    {
+        return toString(false);
+    }
+
+    public String toString(boolean shallow)
+    {
+        StringBuilder buff = new StringBuilder("Mutation(");
+        buff.append("keyspace='").append(keyspaceName).append('\'');
+        buff.append(", key='").append(ByteBufferUtil.bytesToHex(key)).append('\'');
+        buff.append(", modifications=[");
+        if (shallow)
+        {
+            List<String> cfnames = new ArrayList<String>(modifications.size());
+            for (UUID cfid : modifications.keySet())
+            {
+                CFMetaData cfm = Schema.instance.getCFMetaData(cfid);
+                cfnames.add(cfm == null ? "-dropped-" : cfm.cfName);
+            }
+            buff.append(StringUtils.join(cfnames, ", "));
+        }
+        else
+            buff.append(StringUtils.join(modifications.values(), ", "));
+        return buff.append("])").toString();
+    }
+
+    public Mutation without(UUID cfId)
+    {
+        Mutation mutation = new Mutation(keyspaceName, key);
+        for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
+            if (!entry.getKey().equals(cfId))
+                mutation.add(entry.getValue());
+        return mutation;
+    }
+
+    public static class MutationSerializer implements IVersionedSerializer<Mutation>
+    {
+        public void serialize(Mutation mutation, DataOutput out, int version) throws IOException
+        {
+            if (version < MessagingService.VERSION_20)
+                out.writeUTF(mutation.getKeyspaceName());
+
+            ByteBufferUtil.writeWithShortLength(mutation.key(), out);
+
+            /* serialize the modifications in the mutation */
+            int size = mutation.modifications.size();
+            out.writeInt(size);
+            assert size > 0;
+            for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
+                ColumnFamily.serializer.serialize(entry.getValue(), out, version);
+        }
+
+        public Mutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
+        {
+            String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
+            if (version < MessagingService.VERSION_20)
+                keyspaceName = in.readUTF();
+
+            ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+            int size = in.readInt();
+            assert size > 0;
+
+            Map<UUID, ColumnFamily> modifications;
+            if (size == 1)
+            {
+                ColumnFamily cf = deserializeOneCf(in, version, flag);
+                modifications = Collections.singletonMap(cf.id(), cf);
+                keyspaceName = cf.metadata().ksName;
+            }
+            else
+            {
+                modifications = new HashMap<UUID, ColumnFamily>();
+                for (int i = 0; i < size; ++i)
+                {
+                    ColumnFamily cf = deserializeOneCf(in, version, flag);
+                    modifications.put(cf.id(), cf);
+                    keyspaceName = cf.metadata().ksName;
+                }
+            }
+
+            return new Mutation(keyspaceName, key, modifications);
+        }
+
+        private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
+        {
+            ColumnFamily cf = ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, flag, version);
+            // We don't allow Mutation with null column family, so we should never get null back.
+            assert cf != null;
+            return cf;
+        }
+
+        public Mutation deserialize(DataInput in, int version) throws IOException
+        {
+            return deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE);
+        }
+
+        public long serializedSize(Mutation mutation, int version)
+        {
+            TypeSizes sizes = TypeSizes.NATIVE;
+            int size = 0;
+
+            if (version < MessagingService.VERSION_20)
+                size += sizes.sizeof(mutation.getKeyspaceName());
+
+            int keySize = mutation.key().remaining();
+            size += sizes.sizeof((short) keySize) + keySize;
+
+            size += sizes.sizeof(mutation.modifications.size());
+            for (Map.Entry<UUID,ColumnFamily> entry : mutation.modifications.entrySet())
+                size += ColumnFamily.serializer.serializedSize(entry.getValue(), TypeSizes.NATIVE, version);
+
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
new file mode 100644
index 0000000..43ffeae
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.tracing.Tracing;
+
+public class MutationVerbHandler implements IVerbHandler<Mutation>
+{
+    private static final Logger logger = LoggerFactory.getLogger(MutationVerbHandler.class);
+
+    public void doVerb(MessageIn<Mutation> message, int id)
+    {
+        try
+        {
+            // Check if there were any forwarding headers in this message
+            byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
+            InetAddress replyTo;
+            if (from == null)
+            {
+                replyTo = message.from;
+                byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
+                if (forwardBytes != null)
+                    forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
+            }
+            else
+            {
+                replyTo = InetAddress.getByAddress(from);
+            }
+
+            message.payload.apply();
+            WriteResponse response = new WriteResponse();
+            Tracing.trace("Enqueuing response to {}", replyTo);
+            MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
+        }
+        catch (IOException e)
+        {
+            logger.error("Error in mutation", e);
+        }
+    }
+
+    /**
+     * Older version (< 1.0) will not send this message at all, hence we don't
+     * need to check the version of the data.
+     */
+    private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
+    {
+        DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes));
+        int size = in.readInt();
+
+        // tell the recipients who to send their ack to
+        MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
+        // Send a message to each of the addresses on our Forward List
+        for (int i = 0; i < size; i++)
+        {
+            InetAddress address = CompactEndpointSerializationHelper.deserialize(in);
+            int id = in.readInt();
+            Tracing.trace("Enqueuing forwarded write to {}", address);
+            MessagingService.instance().sendOneWay(message, id, address);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
index fca4938..849ac70 100644
--- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -21,12 +21,11 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 
-public class ReadRepairVerbHandler implements IVerbHandler<RowMutation>
+public class ReadRepairVerbHandler implements IVerbHandler<Mutation>
 {
-    public void doVerb(MessageIn<RowMutation> message, int id)
+    public void doVerb(MessageIn<Mutation> message, int id)
     {
-        RowMutation rm = message.payload;
-        rm.apply();
+        message.payload.apply();
         WriteResponse response = new WriteResponse();
         MessagingService.instance().sendReply(response.createMessage(), id, message.from);
     }