You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/01/09 07:12:08 UTC

[1/2] cassandra git commit: Make PartitionUpdate and Mutation immutable

Repository: cassandra
Updated Branches:
  refs/heads/trunk 45c7c4561 -> de7c24b39


http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index a663051..530a03b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.schema.TableMetadataRef;
 class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 {
     protected DecoratedKey currentKey;
-    protected PartitionUpdate update;
+    protected PartitionUpdate.Builder update;
 
     private SSTableTxnWriter writer;
 
@@ -56,7 +56,7 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
         return writer;
     }
 
-    PartitionUpdate getUpdateFor(DecoratedKey key) throws IOException
+    PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
     {
         assert key != null;
 
@@ -65,9 +65,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
         if (!key.equals(currentKey))
         {
             if (update != null)
-                writePartition(update);
+                writePartition(update.build());
             currentKey = key;
-            update = new PartitionUpdate(metadata.get(), currentKey, columns, 4);
+            update = new PartitionUpdate.Builder(metadata.get(), currentKey, columns, 4);
         }
 
         assert update != null;
@@ -79,7 +79,7 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
         try
         {
             if (update != null)
-                writePartition(update);
+                writePartition(update.build());
             if (writer != null)
                 writer.finish(false);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index cd03a40..9d86feb 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.*;
 import com.google.common.collect.Maps;
@@ -356,15 +357,15 @@ public final class SchemaKeyspace
 
     static Collection<Mutation> convertSchemaToMutations()
     {
-        Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
+        Map<DecoratedKey, Mutation.PartitionUpdateCollector> mutationMap = new HashMap<>();
 
         for (String table : ALL)
             convertSchemaToMutations(mutationMap, table);
 
-        return mutationMap.values();
+        return mutationMap.values().stream().map(Mutation.PartitionUpdateCollector::build).collect(Collectors.toList());
     }
 
-    private static void convertSchemaToMutations(Map<DecoratedKey, Mutation> mutationMap, String schemaTableName)
+    private static void convertSchemaToMutations(Map<DecoratedKey, Mutation.PartitionUpdateCollector> mutationMap, String schemaTableName)
     {
         ReadCommand cmd = getReadCommandForTableSchema(schemaTableName);
         try (ReadExecutionController executionController = cmd.executionController();
@@ -378,9 +379,8 @@ public final class SchemaKeyspace
                         continue;
 
                     DecoratedKey key = partition.partitionKey();
-                    Mutation mutation = mutationMap.computeIfAbsent(key, k -> new Mutation(SchemaConstants.SCHEMA_KEYSPACE_NAME, key));
-
-                    mutation.add(makeUpdateForSchema(partition, cmd.columnFilter()));
+                    Mutation.PartitionUpdateCollector puCollector = mutationMap.computeIfAbsent(key, k -> new Mutation.PartitionUpdateCollector(SchemaConstants.SCHEMA_KEYSPACE_NAME, key));
+                    puCollector.add(makeUpdateForSchema(partition, cmd.columnFilter()));
                 }
             }
         }
@@ -423,7 +423,7 @@ public final class SchemaKeyspace
     @SuppressWarnings("unchecked")
     private static DecoratedKey decorate(TableMetadata metadata, Object value)
     {
-        return metadata.partitioner.decorateKey(((AbstractType)metadata.partitionKeyType).decompose(value));
+        return metadata.partitioner.decorateKey(((AbstractType) metadata.partitionKeyType).decompose(value));
     }
 
     static Mutation.SimpleBuilder makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 933014f..54f4b0c 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -207,7 +207,7 @@ public class DataResolver extends ResponseResolver
             private final DecoratedKey partitionKey;
             private final RegularAndStaticColumns columns;
             private final boolean isReversed;
-            private final PartitionUpdate[] repairs = new PartitionUpdate[sources.length];
+            private final PartitionUpdate.Builder[] repairs = new PartitionUpdate.Builder[sources.length];
 
             private final Row.Builder[] currentRows = new Row.Builder[sources.length];
             private final RowDiffListener diffListener;
@@ -268,10 +268,10 @@ public class DataResolver extends ResponseResolver
                 };
             }
 
-            private PartitionUpdate update(int i)
+            private PartitionUpdate.Builder update(int i)
             {
                 if (repairs[i] == null)
-                    repairs[i] = new PartitionUpdate(command.metadata(), partitionKey, columns, 1);
+                    repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
                 return repairs[i];
             }
 
@@ -468,7 +468,7 @@ public class DataResolver extends ResponseResolver
             {
                 for (int i = 0; i < repairs.length; i++)
                     if (null != repairs[i])
-                        sendRepairMutation(repairs[i], sources[i]);
+                        sendRepairMutation(repairs[i].build(), sources[i]);
             }
 
             private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 6d672f5..422eaa8 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -59,8 +59,8 @@ public class Commit
 
     public static Commit newProposal(UUID ballot, PartitionUpdate update)
     {
-        update.updateAllTimestamp(UUIDGen.microsTimestamp(ballot));
-        return new Commit(ballot, update);
+        PartitionUpdate withNewTimestamp = new PartitionUpdate.Builder(update, 0).updateAllTimestamp(UUIDGen.microsTimestamp(ballot)).build();
+        return new Commit(ballot, withNewTimestamp);
     }
 
     public static Commit emptyCommit(DecoratedKey key, TableMetadata metadata)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index e5c5727..b553a12 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -170,7 +170,9 @@ public class ColumnFamilyStoreTest
         ByteBuffer val = ByteBufferUtil.bytes("val1");
 
         // insert
-        ColumnMetadata newCol = ColumnMetadata.regularColumn(cfs.metadata(), ByteBufferUtil.bytes("val2"), AsciiType.instance);
+        Mutation.SimpleBuilder builder = Mutation.simpleBuilder(keyspaceName, cfs.metadata().partitioner.decorateKey(ByteBufferUtil.bytes("val2")));
+        builder.update(cfName).row("Column1").add("val", "val1").build();
+
         new RowUpdateBuilder(cfs.metadata(), 0, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
         new RowUpdateBuilder(cfs.metadata(), 0, "key2").clustering("Column1").add("val", "val1").build().applyUnsafe();
         assertRangeCount(cfs, col, val, 2);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/CounterMutationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterMutationTest.java b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
index f36deea..9be0960 100644
--- a/test/unit/org/apache/cassandra/db/CounterMutationTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
@@ -118,20 +118,20 @@ public class CounterMutationTest
         cfsTwo.truncateBlocking();
 
         // Do the update (+1, -1), (+2, -2)
-        Mutation batch = new Mutation(KEYSPACE1, Util.dk("key1"));
+        Mutation.PartitionUpdateCollector batch = new Mutation.PartitionUpdateCollector(KEYSPACE1, Util.dk("key1"));
         batch.add(new RowUpdateBuilder(cfsOne.metadata(), 5, "key1")
             .clustering("cc")
             .add("val", 1L)
             .add("val2", -1L)
-            .build().get(cfsOne.metadata()));
+            .build().getPartitionUpdate(cfsOne.metadata()));
 
         batch.add(new RowUpdateBuilder(cfsTwo.metadata(), 5, "key1")
             .clustering("cc")
             .add("val", 2L)
             .add("val2", -2L)
-            .build().get(cfsTwo.metadata()));
+            .build().getPartitionUpdate(cfsTwo.metadata()));
 
-        new CounterMutation(batch, ConsistencyLevel.ONE).apply();
+        new CounterMutation(batch.build(), ConsistencyLevel.ONE).apply();
 
         ColumnMetadata c1cfs1 = cfsOne.metadata().getColumn(ByteBufferUtil.bytes("val"));
         ColumnMetadata c2cfs1 = cfsOne.metadata().getColumn(ByteBufferUtil.bytes("val2"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DeletePartitionTest.java b/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
index 95bb5a4..6ed43f7 100644
--- a/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
@@ -78,8 +78,9 @@ public class DeletePartitionTest
             store.forceBlockingFlush();
 
         // delete the partition
-        new Mutation(KEYSPACE1, key)
+        new Mutation.PartitionUpdateCollector(KEYSPACE1, key)
                 .add(PartitionUpdate.fullPartitionDelete(store.metadata(), key, 0, FBUtilities.nowInSeconds()))
+                .build()
                 .applyUnsafe();
 
         if (flushAfterRemove)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 5134857..ca2765d 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -85,13 +85,13 @@ public class RowTest
     @Test
     public void testMergeRangeTombstones() throws InterruptedException
     {
-        PartitionUpdate update1 = new PartitionUpdate(metadata, dk, metadata.regularAndStaticColumns(), 1);
+        PartitionUpdate.Builder update1 = new PartitionUpdate.Builder(metadata, dk, metadata.regularAndStaticColumns(), 1);
         writeRangeTombstone(update1, "1", "11", 123, 123);
         writeRangeTombstone(update1, "2", "22", 123, 123);
         writeRangeTombstone(update1, "3", "31", 123, 123);
         writeRangeTombstone(update1, "4", "41", 123, 123);
 
-        PartitionUpdate update2 = new PartitionUpdate(metadata, dk, metadata.regularAndStaticColumns(), 1);
+        PartitionUpdate.Builder update2 = new PartitionUpdate.Builder(metadata, dk, metadata.regularAndStaticColumns(), 1);
         writeRangeTombstone(update2, "1", "11", 123, 123);
         writeRangeTombstone(update2, "111", "112", 1230, 123);
         writeRangeTombstone(update2, "2", "24", 123, 123);
@@ -99,7 +99,7 @@ public class RowTest
         writeRangeTombstone(update2, "4", "41", 123, 1230);
         writeRangeTombstone(update2, "5", "51", 123, 1230);
 
-        try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(ImmutableList.of(update1.unfilteredIterator(), update2.unfilteredIterator()), nowInSeconds))
+        try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(ImmutableList.of(update1.build().unfilteredIterator(), update2.build().unfilteredIterator()), nowInSeconds))
         {
             Object[][] expected = new Object[][]{ { "1", "11", 123l, 123 },
                                                   { "111", "112", 1230l, 123 },
@@ -204,7 +204,7 @@ public class RowTest
         assertEquals(expected[3], deletionTime.localDeletionTime());
     }
 
-    public void writeRangeTombstone(PartitionUpdate update, Object start, Object end, long markedForDeleteAt, int localDeletionTime)
+    public void writeRangeTombstone(PartitionUpdate.Builder update, Object start, Object end, long markedForDeleteAt, int localDeletionTime)
     {
         ClusteringComparator comparator = cfs.getComparator();
         update.add(new RangeTombstone(Slice.make(comparator.make(start), comparator.make(end)), new DeletionTime(markedForDeleteAt, localDeletionTime)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
index f4eafa3..706b274 100644
--- a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -109,26 +109,16 @@ public class RowUpdateBuilder
 
     public PartitionUpdate buildUpdate()
     {
-        PartitionUpdate update = updateBuilder.build();
         for (RangeTombstone rt : rts)
-            update.add(rt);
-        return update;
+            updateBuilder.addRangeTombstone(rt);
+        return updateBuilder.build();
     }
 
-    private static void deleteRow(PartitionUpdate update, long timestamp, int localDeletionTime, Object... clusteringValues)
+    private static void deleteRow(PartitionUpdate.Builder updateBuilder, long timestamp, int localDeletionTime, Object... clusteringValues)
     {
-        assert clusteringValues.length == update.metadata().comparator.size() || (clusteringValues.length == 0 && !update.columns().statics.isEmpty());
-
-        boolean isStatic = clusteringValues.length != update.metadata().comparator.size();
-        Row.Builder builder = BTreeRow.sortedBuilder();
-
-        if (isStatic)
-            builder.newRow(Clustering.STATIC_CLUSTERING);
-        else
-            builder.newRow(clusteringValues.length == 0 ? Clustering.EMPTY : update.metadata().comparator.make(clusteringValues));
-        builder.addRowDeletion(Row.Deletion.regular(new DeletionTime(timestamp, localDeletionTime)));
-
-        update.add(builder.build());
+        SimpleBuilders.RowBuilder b = new SimpleBuilders.RowBuilder(updateBuilder.metadata(), clusteringValues);
+        b.nowInSec(localDeletionTime).timestamp(timestamp).delete();
+        updateBuilder.add(b.build());
     }
 
     public static Mutation deleteRow(TableMetadata metadata, long timestamp, Object key, Object... clusteringValues)
@@ -138,11 +128,9 @@ public class RowUpdateBuilder
 
     public static Mutation deleteRowAt(TableMetadata metadata, long timestamp, int localDeletionTime, Object key, Object... clusteringValues)
     {
-        PartitionUpdate update = new PartitionUpdate(metadata, makeKey(metadata, key), metadata.regularAndStaticColumns(), 0);
+        PartitionUpdate.Builder update = new PartitionUpdate.Builder(metadata, makeKey(metadata, key), metadata.regularAndStaticColumns(), 0);
         deleteRow(update, timestamp, localDeletionTime, clusteringValues);
-        // note that the created mutation may get further update later on, so we don't use the ctor that create a singletonMap
-        // underneath (this class if for convenience, not performance)
-        return new Mutation(update.metadata().keyspace, update.partitionKey()).add(update);
+        return new Mutation.PartitionUpdateCollector(update.metadata().keyspace, update.partitionKey()).add(update.build()).build();
     }
 
     private static DecoratedKey makeKey(TableMetadata metadata, Object... partitionKey)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
index 6eaf2c8..ca76e45 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
@@ -170,7 +170,7 @@ public class CommitLogReaderTest extends CQLTester
         int j = 0;
         while (i + j < handler.seenMutationCount())
         {
-            PartitionUpdate pu = handler.seenMutations.get(i + j).get(currentTableMetadata());
+            PartitionUpdate pu = handler.seenMutations.get(i + j).getPartitionUpdate(currentTableMetadata());
             if (pu == null)
             {
                 j++;
@@ -234,7 +234,7 @@ public class CommitLogReaderTest extends CQLTester
 
         public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
         {
-            if ((metadata == null) || (metadata != null && m.get(metadata) != null)) {
+            if ((metadata == null) || (metadata != null && m.getPartitionUpdate(metadata) != null)) {
                 seenMutations.add(m);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 5f0b832..fc193e8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -193,8 +193,9 @@ public class CompactionsPurgeTest
         }
         cfs.forceBlockingFlush();
 
-        new Mutation(KEYSPACE1, dk(key))
+        new Mutation.PartitionUpdateCollector(KEYSPACE1, dk(key))
             .add(PartitionUpdate.fullPartitionDelete(cfs.metadata(), dk(key), Long.MAX_VALUE, FBUtilities.nowInSeconds()))
+            .build()
             .applyUnsafe();
         cfs.forceBlockingFlush();
 
@@ -423,9 +424,9 @@ public class CompactionsPurgeTest
         }
 
         // deletes partition
-        Mutation rm = new Mutation(KEYSPACE_CACHED, dk(key));
+        Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KEYSPACE_CACHED, dk(key));
         rm.add(PartitionUpdate.fullPartitionDelete(cfs.metadata(), dk(key), 1, FBUtilities.nowInSeconds()));
-        rm.applyUnsafe();
+        rm.build().applyUnsafe();
 
         // Adds another unrelated partition so that the sstable is not considered fully expired. We do not
         // invalidate the row cache in that latter case.
@@ -463,9 +464,9 @@ public class CompactionsPurgeTest
         }
 
         // deletes partition with timestamp such that not all columns are deleted
-        Mutation rm = new Mutation(KEYSPACE1, dk(key));
+        Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KEYSPACE1, dk(key));
         rm.add(PartitionUpdate.fullPartitionDelete(cfs.metadata(), dk(key), 4, FBUtilities.nowInSeconds()));
-        rm.applyUnsafe();
+        rm.build().applyUnsafe();
 
         ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build());
         assertFalse(partition.partitionLevelDeletion().isLive());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
index 0f83dac..f887ede 100644
--- a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
+++ b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
@@ -47,4 +47,19 @@ public class PartitionUpdateTest extends CQLTester
         builder.newRow(1).add("a", 1);
         Assert.assertEquals(2, builder.build().operationCount());
     }
+
+    @Test
+    public void testUpdateAllTimestamp()
+    {
+        createTable("CREATE TABLE %s (key text, clustering int, a int, b int, c int, s int static, PRIMARY KEY(key, clustering))");
+        TableMetadata cfm = currentTableMetadata();
+
+        long timestamp = FBUtilities.timestampMicros();
+        RowUpdateBuilder rub = new RowUpdateBuilder(cfm, timestamp, "key0").clustering(1).add("a", 1);
+        PartitionUpdate pu = rub.buildUpdate();
+        PartitionUpdate pu2 = new PartitionUpdate.Builder(pu, 0).updateAllTimestamp(0).build();
+
+        Assert.assertTrue(pu.maxTimestamp() > 0);
+        Assert.assertTrue(pu2.maxTimestamp() == 0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
index fb35ead..0d790fc 100644
--- a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
@@ -366,11 +366,11 @@ public class RowAndDeletionMergeIteratorTest
 
     private Iterator<Row> createRowIterator()
     {
-        PartitionUpdate update = new PartitionUpdate(cfm, dk, cfm.regularAndStaticColumns(), 1);
+        PartitionUpdate.Builder update = new PartitionUpdate.Builder(cfm, dk, cfm.regularAndStaticColumns(), 1);
         for (int i = 0; i < 5; i++)
             addRow(update, i, i);
 
-        return update.iterator();
+        return update.build().iterator();
     }
 
     private UnfilteredRowIterator createMergeIterator(Iterator<Row> rows, Iterator<RangeTombstone> tombstones, boolean reversed)
@@ -395,7 +395,7 @@ public class RowAndDeletionMergeIteratorTest
                                                true);
     }
 
-    private void addRow(PartitionUpdate update, int col1, int a)
+    private void addRow(PartitionUpdate.Builder update, int col1, int a)
     {
         update.add(BTreeRow.singleCellRow(update.metadata().comparator.make(col1), makeCell(defA, a, 0)));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index 406832a..071f84d 100644
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@ -767,7 +767,7 @@ public class SASIIndexTest
     {
         ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 
-        Mutation rm1 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
+        Mutation.PartitionUpdateCollector rm1 = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
         rm1.add(PartitionUpdate.singleRowUpdate(store.metadata(),
                                                 rm1.key(),
                                                 buildRow(buildCell(store.metadata(),
@@ -775,7 +775,7 @@ public class SASIIndexTest
                                                                    AsciiType.instance.decompose("jason"),
                                                                    System.currentTimeMillis()))));
 
-        Mutation rm2 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key2")));
+        Mutation.PartitionUpdateCollector rm2 = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey(AsciiType.instance.decompose("key2")));
         rm2.add(PartitionUpdate.singleRowUpdate(store.metadata(),
                                                 rm2.key(),
                                                 buildRow(buildCell(store.metadata(),
@@ -783,7 +783,7 @@ public class SASIIndexTest
                                                                    AsciiType.instance.decompose("pavel"),
                                                                    System.currentTimeMillis()))));
 
-        Mutation rm3 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key3")));
+        Mutation.PartitionUpdateCollector rm3 = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey(AsciiType.instance.decompose("key3")));
         rm3.add(PartitionUpdate.singleRowUpdate(store.metadata(),
                                                 rm3.key(),
                                                 buildRow(buildCell(store.metadata(),
@@ -791,9 +791,9 @@ public class SASIIndexTest
                                                                    AsciiType.instance.decompose("Aleksey"),
                                                                    System.currentTimeMillis()))));
 
-        rm1.apply();
-        rm2.apply();
-        rm3.apply();
+        rm1.build().apply();
+        rm2.build().apply();
+        rm3.build().apply();
 
         if (forceFlush)
             store.forceBlockingFlush();
@@ -1095,13 +1095,13 @@ public class SASIIndexTest
         final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
         final ByteBuffer age = UTF8Type.instance.decompose("age");
 
-        Mutation rm = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
+        Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
         update(rm, new ArrayList<Cell>()
         {{
             add(buildCell(age, LongType.instance.decompose(26L), System.currentTimeMillis()));
             add(buildCell(firstName, AsciiType.instance.decompose("pavel"), System.currentTimeMillis()));
         }});
-        rm.apply();
+        rm.build().apply();
 
         store.forceBlockingFlush();
 
@@ -1127,25 +1127,25 @@ public class SASIIndexTest
 
         final ByteBuffer comment = UTF8Type.instance.decompose("comment");
 
-        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
+        Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
         update(rm, comment, UTF8Type.instance.decompose("ⓈⓅⒺⒸⒾⒶⓁ ⒞⒣⒜⒭⒮ and normal ones"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key2"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key2"));
         update(rm, comment, UTF8Type.instance.decompose("龍馭鬱"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key3"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key3"));
         update(rm, comment, UTF8Type.instance.decompose("インディアナ"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key4"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key4"));
         update(rm, comment, UTF8Type.instance.decompose("レストラン"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key5"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key5"));
         update(rm, comment, UTF8Type.instance.decompose("ベンジャミン ウエスト"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
         if (forceFlush)
             store.forceBlockingFlush();
@@ -1203,21 +1203,21 @@ public class SASIIndexTest
 
         final ByteBuffer comment = UTF8Type.instance.decompose("comment_suffix_split");
 
-        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
+        Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
         update(rm, comment, UTF8Type.instance.decompose("龍馭鬱"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key2"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key2"));
         update(rm, comment, UTF8Type.instance.decompose("インディアナ"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key3"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key3"));
         update(rm, comment, UTF8Type.instance.decompose("レストラン"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key4"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key4"));
         update(rm, comment, UTF8Type.instance.decompose("ベンジャミン ウエスト"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
         if (forceFlush)
             store.forceBlockingFlush();
@@ -1272,9 +1272,9 @@ public class SASIIndexTest
 
             final ByteBuffer bigValue = UTF8Type.instance.decompose(new String(randomBytes));
 
-            Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
+            Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
             update(rm, comment, bigValue, System.currentTimeMillis());
-            rm.apply();
+            rm.build().apply();
 
             Set<String> rows;
 
@@ -1352,37 +1352,37 @@ public class SASIIndexTest
 
         final ByteBuffer fullName = UTF8Type.instance.decompose("/output/full-name/");
 
-        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
+        Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
         update(rm, fullName, UTF8Type.instance.decompose("美加 八田"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key2"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key2"));
         update(rm, fullName, UTF8Type.instance.decompose("仁美 瀧澤"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key3"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key3"));
         update(rm, fullName, UTF8Type.instance.decompose("晃宏 高須"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key4"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key4"));
         update(rm, fullName, UTF8Type.instance.decompose("弘孝 大竹"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key5"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key5"));
         update(rm, fullName, UTF8Type.instance.decompose("満枝 榎本"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key6"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key6"));
         update(rm, fullName, UTF8Type.instance.decompose("飛鳥 上原"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key7"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key7"));
         update(rm, fullName, UTF8Type.instance.decompose("大輝 鎌田"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key8"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key8"));
         update(rm, fullName, UTF8Type.instance.decompose("利久 寺地"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
         store.forceBlockingFlush();
 
@@ -1408,17 +1408,17 @@ public class SASIIndexTest
 
         final ByteBuffer comment = UTF8Type.instance.decompose("address");
 
-        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
+        Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
         update(rm, comment, UTF8Type.instance.decompose("577 Rogahn Valleys Apt. 178"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key2"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key2"));
         update(rm, comment, UTF8Type.instance.decompose("89809 Beverly Course Suite 089"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key3"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key3"));
         update(rm, comment, UTF8Type.instance.decompose("165 clydie oval apt. 399"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
         if (forceFlush)
             store.forceBlockingFlush();
@@ -1483,42 +1483,42 @@ public class SASIIndexTest
 
         final ByteBuffer name = UTF8Type.instance.decompose("first_name_prefix");
 
-        Mutation rm;
+        Mutation.PartitionUpdateCollector rm;
 
-        rm = new Mutation(KS_NAME, decoratedKey("key1"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
         update(rm, name, UTF8Type.instance.decompose("Pavel"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key2"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key2"));
         update(rm, name, UTF8Type.instance.decompose("Jordan"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key3"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key3"));
         update(rm, name, UTF8Type.instance.decompose("Mikhail"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key4"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key4"));
         update(rm, name, UTF8Type.instance.decompose("Michael"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key5"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key5"));
         update(rm, name, UTF8Type.instance.decompose("Johnny"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
         // first flush would make interval for name - 'johnny' -> 'pavel'
         store.forceBlockingFlush();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key6"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key6"));
         update(rm, name, UTF8Type.instance.decompose("Jason"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key7"));
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key7"));
         update(rm, name, UTF8Type.instance.decompose("Vijay"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
-        rm = new Mutation(KS_NAME, decoratedKey("key8")); // this name is going to be tokenized
+        rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key8")); // this name is going to be tokenized
         update(rm, name, UTF8Type.instance.decompose("Jean-Claude"), System.currentTimeMillis());
-        rm.apply();
+        rm.build().apply();
 
         // this flush is going to produce range - 'jason' -> 'vijay'
         store.forceBlockingFlush();
@@ -2422,7 +2422,7 @@ public class SASIIndexTest
 
     private static Mutation newMutation(String key, String firstName, String lastName, int age, long timestamp)
     {
-        Mutation rm = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose(key)));
+        Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey(AsciiType.instance.decompose(key)));
         List<Cell> cells = new ArrayList<>(3);
 
         if (age >= 0)
@@ -2433,7 +2433,7 @@ public class SASIIndexTest
             cells.add(buildCell(ByteBufferUtil.bytes("last_name"), UTF8Type.instance.decompose(lastName), timestamp));
 
         update(rm, cells);
-        return rm;
+        return rm.build();
     }
 
     private static Set<String> getKeys(final UnfilteredPartitionIterator rows)
@@ -2526,14 +2526,14 @@ public class SASIIndexTest
         return new Expression(name, op, value);
     }
 
-    private static void update(Mutation rm, ByteBuffer name, ByteBuffer value, long timestamp)
+    private static void update(Mutation.PartitionUpdateCollector rm, ByteBuffer name, ByteBuffer value, long timestamp)
     {
         TableMetadata metadata = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).metadata();
         rm.add(PartitionUpdate.singleRowUpdate(metadata, rm.key(), buildRow(buildCell(metadata, name, value, timestamp))));
     }
 
 
-    private static void update(Mutation rm, List<Cell> cells)
+    private static void update(Mutation.PartitionUpdateCollector rm, List<Cell> cells)
     {
         TableMetadata metadata = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).metadata();
         rm.add(PartitionUpdate.singleRowUpdate(metadata, rm.key(), buildRow(cells)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index d3257d7..2d12baf 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.schema.MockSchema;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -59,9 +60,10 @@ public class WriteCallbackInfoTest
 
     private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean allowHints, boolean expectHint) throws Exception
     {
+        TableMetadata metadata = MockSchema.newTableMetadata("", "");
         Object payload = verb == Verb.PAXOS_COMMIT
-                         ? new Commit(UUID.randomUUID(), new PartitionUpdate(MockSchema.newTableMetadata("", ""), ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1))
-                         : new Mutation("", new BufferDecoratedKey(new Murmur3Partitioner.LongToken(0), ByteBufferUtil.EMPTY_BYTE_BUFFER));
+                         ? new Commit(UUID.randomUUID(), new PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1).build())
+                         : new Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
 
         WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddress.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints);
         Assert.assertEquals(expectHint, wcbi.shouldHint());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
index aa5ee9b..50183e5 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@ -107,8 +107,8 @@ public class TriggerExecutorTest
         TableMetadata metadata = makeTableMetadata("ks1", "cf1", TriggerMetadata.create("test", SameKeySameCfTrigger.class.getName()));
         PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
         PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
-        Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
-        Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2);
+        Mutation rm1 = new Mutation.PartitionUpdateCollector("ks1", cf1.partitionKey()).add(cf1).build();
+        Mutation rm2 = new Mutation.PartitionUpdateCollector("ks1", cf2.partitionKey()).add(cf2).build();
 
         List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
         assertEquals(2, tmutations.size());
@@ -133,8 +133,8 @@ public class TriggerExecutorTest
         TableMetadata metadata = makeTableMetadata("ks1", "cf1", TriggerMetadata.create("test", SameKeySameCfPartialTrigger.class.getName()));
         PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
         PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
-        Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
-        Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2);
+        Mutation rm1 = new Mutation.PartitionUpdateCollector("ks1", cf1.partitionKey()).add(cf1).build();
+        Mutation rm2 = new Mutation.PartitionUpdateCollector("ks1", cf2.partitionKey()).add(cf2).build();
 
         List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
         assertEquals(2, tmutations.size());
@@ -159,8 +159,8 @@ public class TriggerExecutorTest
         TableMetadata metadata = makeTableMetadata("ks1", "cf1", TriggerMetadata.create("test", SameKeyDifferentCfTrigger.class.getName()));
         PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
         PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
-        Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
-        Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2);
+        Mutation rm1 = new Mutation.PartitionUpdateCollector("ks1", cf1.partitionKey()).add(cf1).build();
+        Mutation rm2 = new Mutation.PartitionUpdateCollector("ks1", cf2.partitionKey()).add(cf2).build();
 
         List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
         assertEquals(2, tmutations.size());
@@ -210,8 +210,8 @@ public class TriggerExecutorTest
         TableMetadata metadata = makeTableMetadata("ks1", "cf1", TriggerMetadata.create("test", SameKeyDifferentKsTrigger.class.getName()));
         PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
         PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
-        Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
-        Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2);
+        Mutation rm1 = new Mutation.PartitionUpdateCollector("ks1", cf1.partitionKey()).add(cf1).build();
+        Mutation rm2 = new Mutation.PartitionUpdateCollector("ks1", cf2.partitionKey()).add(cf2).build();
 
         List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
         assertEquals(4, tmutations.size());
@@ -248,7 +248,7 @@ public class TriggerExecutorTest
 
         TableMetadata metadata = makeTableMetadata("ks1", "cf1", TriggerMetadata.create("test", DifferentKeyTrigger.class.getName()));
         PartitionUpdate cf1 = makeCf(metadata, "k1", "v1", null);
-        Mutation rm = new Mutation("ks1", cf1.partitionKey()).add(cf1);
+        Mutation rm = new Mutation.PartitionUpdateCollector("ks1", cf1.partitionKey()).add(cf1).build();
 
         List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm)));
         assertEquals(2, tmutations.size());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/2] cassandra git commit: Make PartitionUpdate and Mutation immutable

Posted by ma...@apache.org.
Make PartitionUpdate and Mutation immutable

Patch by marcuse; reviewed by Aleksey Yeschenko for CASSANDRA-13867


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de7c24b3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de7c24b3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de7c24b3

Branch: refs/heads/trunk
Commit: de7c24b395265ff619c622ed6be8d88453f158ac
Parents: 45c7c45
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Sep 13 15:57:50 2017 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jan 9 08:10:23 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cql3/statements/BatchStatement.java         |   4 +-
 .../cql3/statements/BatchUpdatesCollector.java  | 231 ++++++++++
 .../cql3/statements/CQL3CasRequest.java         |  26 +-
 .../cql3/statements/DeleteStatement.java        |  18 +-
 .../cql3/statements/ModificationStatement.java  |  33 +-
 .../statements/SingleTableUpdatesCollector.java | 101 +++++
 .../cql3/statements/UpdateStatement.java        |  12 +-
 .../cql3/statements/UpdatesCollector.java       | 127 +-----
 .../apache/cassandra/db/CounterMutation.java    |   7 +-
 src/java/org/apache/cassandra/db/IMutation.java |   6 +
 src/java/org/apache/cassandra/db/Mutation.java  | 137 +++---
 .../org/apache/cassandra/db/SimpleBuilders.java |  25 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   9 +-
 .../db/commitlog/CommitLogReplayer.java         |  20 +-
 .../db/partitions/PartitionUpdate.java          | 448 +++++++++----------
 .../apache/cassandra/db/view/TableViews.java    |  16 +-
 .../cassandra/db/view/ViewUpdateGenerator.java  |  20 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |   4 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |  22 +-
 .../io/sstable/SSTableSimpleWriter.java         |  10 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |  14 +-
 .../apache/cassandra/service/DataResolver.java  |   8 +-
 .../apache/cassandra/service/paxos/Commit.java  |   4 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   4 +-
 .../cassandra/db/CounterMutationTest.java       |   8 +-
 .../cassandra/db/DeletePartitionTest.java       |   3 +-
 test/unit/org/apache/cassandra/db/RowTest.java  |   8 +-
 .../apache/cassandra/db/RowUpdateBuilder.java   |  28 +-
 .../db/commitlog/CommitLogReaderTest.java       |   4 +-
 .../db/compaction/CompactionsPurgeTest.java     |  11 +-
 .../db/partition/PartitionUpdateTest.java       |  15 +
 .../rows/RowAndDeletionMergeIteratorTest.java   |   6 +-
 .../cassandra/index/sasi/SASIIndexTest.java     | 142 +++---
 .../cassandra/net/WriteCallbackInfoTest.java    |   6 +-
 .../cassandra/triggers/TriggerExecutorTest.java |  18 +-
 36 files changed, 912 insertions(+), 644 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ba0f842..623bf8a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867)
  * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066)
  * Fix cassandra-stress startup failure (CASSANDRA-14106)
  * Remove initialDirectories from CFS (CASSANDRA-13928)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 7497f47..b54e3a0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -224,7 +224,7 @@ public class BatchStatement implements CQLStatement
     throws RequestExecutionException, RequestValidationException
     {
         Set<String> tablesWithZeroGcGs = null;
-        UpdatesCollector collector = new UpdatesCollector(updatedColumns, updatedRows());
+        BatchUpdatesCollector collector = new BatchUpdatesCollector(updatedColumns, updatedRows());
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
@@ -247,8 +247,6 @@ public class BatchStatement implements CQLStatement
             ClientWarn.instance.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs })
                                                      .getMessage());
         }
-
-        collector.validateIndexedColumns();
         return collector.toMutations();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
new file mode 100644
index 0000000..9671b02
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+
+/**
+ * Utility class to collect updates.
+ *
+ * <p>In a batch statement we don't want to recreate mutations every time as this is particularly inefficient when
+ * applying multiple batch to the same partition (see #6737). </p>
+ *
+ */
+final class BatchUpdatesCollector implements UpdatesCollector
+{
+    /**
+     * The columns that will be updated for each table (keyed by the table ID).
+     */
+    private final Map<TableId, RegularAndStaticColumns> updatedColumns;
+
+    /**
+     * The estimated number of updated row.
+     */
+    private final int updatedRows;
+
+    /**
+     * The mutations per keyspace.
+     */
+    private final Map<String, Map<ByteBuffer, IMutationBuilder>> mutationBuilders = new HashMap<>();
+
+    BatchUpdatesCollector(Map<TableId, RegularAndStaticColumns> updatedColumns, int updatedRows)
+    {
+        super();
+        this.updatedColumns = updatedColumns;
+        this.updatedRows = updatedRows;
+    }
+
+    /**
+     * Gets the <code>PartitionUpdate.Builder</code> for the specified column family and key. If the builder does not
+     * exist it will be created.
+     *
+     * @param metadata the column family meta data
+     * @param dk the partition key
+     * @param consistency the consistency level
+     * @return the <code>PartitionUpdate.Builder</code> for the specified column family and key
+     */
+    public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
+    {
+        IMutationBuilder mut = getMutationBuilder(metadata, dk, consistency);
+        PartitionUpdate.Builder upd = mut.get(metadata.id);
+        if (upd == null)
+        {
+            RegularAndStaticColumns columns = updatedColumns.get(metadata.id);
+            assert columns != null;
+            upd = new PartitionUpdate.Builder(metadata, dk, columns, updatedRows);
+            mut.add(upd);
+        }
+        return upd;
+    }
+
+    private IMutationBuilder getMutationBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
+    {
+        String ksName = metadata.keyspace;
+        IMutationBuilder mutationBuilder = keyspaceMap(ksName).get(dk.getKey());
+        if (mutationBuilder == null)
+        {
+            MutationBuilder builder = new MutationBuilder(ksName, dk);
+            mutationBuilder = metadata.isCounter() ? new CounterMutationBuilder(builder, consistency) : builder;
+            keyspaceMap(ksName).put(dk.getKey(), mutationBuilder);
+        }
+        return mutationBuilder;
+    }
+
+    /**
+     * Returns a collection containing all the mutations.
+     * @return a collection containing all the mutations.
+     */
+    public Collection<IMutation> toMutations()
+    {
+        //TODO: The case where all statement where on the same keyspace is pretty common, optimize for that?
+        List<IMutation> ms = new ArrayList<>();
+        for (Map<ByteBuffer, IMutationBuilder> ksMap : mutationBuilders.values())
+        {
+            for (IMutationBuilder builder : ksMap.values())
+            {
+                IMutation mutation = builder.build();
+                mutation.validateIndexedColumns();
+                ms.add(mutation);
+            }
+        }
+        return ms;
+    }
+
+    /**
+     * Returns the key-mutation mappings for the specified keyspace.
+     *
+     * @param ksName the keyspace name
+     * @return the key-mutation mappings for the specified keyspace.
+     */
+    private Map<ByteBuffer, IMutationBuilder> keyspaceMap(String ksName)
+    {
+        return mutationBuilders.computeIfAbsent(ksName, k -> new HashMap<>());
+    }
+
+    private interface IMutationBuilder
+    {
+        /**
+         * Add a new PartitionUpdate builder to this mutation builder
+         * @param builder the builder to add
+         * @return this
+         */
+        IMutationBuilder add(PartitionUpdate.Builder builder);
+
+        /**
+         * Build the immutable mutation
+         */
+        IMutation build();
+
+        /**
+         * Get the builder for the given tableId
+         */
+        PartitionUpdate.Builder get(TableId tableId);
+    }
+
+    private static class MutationBuilder implements IMutationBuilder
+    {
+        private final HashMap<TableId, PartitionUpdate.Builder> modifications = new HashMap<>();
+        private final DecoratedKey key;
+        private final String keyspaceName;
+        private final long createdAt = System.currentTimeMillis();
+
+        private MutationBuilder(String keyspaceName, DecoratedKey key)
+        {
+            this.keyspaceName = keyspaceName;
+            this.key = key;
+        }
+
+        public MutationBuilder add(PartitionUpdate.Builder updateBuilder)
+        {
+            assert updateBuilder != null;
+            assert updateBuilder.partitionKey().getPartitioner() == key.getPartitioner();
+            PartitionUpdate.Builder prev = modifications.put(updateBuilder.metadata().id, updateBuilder);
+            if (prev != null)
+                // developer error
+                throw new IllegalArgumentException("Table " + updateBuilder.metadata().name + " already has modifications in this mutation: " + prev);
+            return this;
+        }
+
+        public Mutation build()
+        {
+            ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>();
+            for (Map.Entry<TableId, PartitionUpdate.Builder> updateEntry : modifications.entrySet())
+            {
+                PartitionUpdate update = updateEntry.getValue().build();
+                updates.put(updateEntry.getKey(), update);
+            }
+            return new Mutation(keyspaceName, key, updates.build(), createdAt);
+        }
+
+        public PartitionUpdate.Builder get(TableId tableId)
+        {
+            return modifications.get(tableId);
+        }
+
+        public DecoratedKey key()
+        {
+            return key;
+        }
+
+        public boolean isEmpty()
+        {
+            return modifications.isEmpty();
+        }
+
+        public String getKeyspaceName()
+        {
+            return keyspaceName;
+        }
+    }
+
+    private static class CounterMutationBuilder implements IMutationBuilder
+    {
+        private final MutationBuilder mutationBuilder;
+        private final ConsistencyLevel cl;
+
+        private CounterMutationBuilder(MutationBuilder mutationBuilder, ConsistencyLevel cl)
+        {
+            this.mutationBuilder = mutationBuilder;
+            this.cl = cl;
+        }
+
+        public IMutationBuilder add(PartitionUpdate.Builder builder)
+        {
+            return mutationBuilder.add(builder);
+        }
+
+        public IMutation build()
+        {
+            return new CounterMutation(mutationBuilder.build(), cl);
+        }
+
+        public PartitionUpdate.Builder get(TableId id)
+        {
+            return mutationBuilder.get(id);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index ec11df3..8619945 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -235,14 +235,16 @@ public class CQL3CasRequest implements CASRequest
 
     public PartitionUpdate makeUpdates(FilteredPartition current) throws InvalidRequestException
     {
-        PartitionUpdate update = new PartitionUpdate(metadata, key, updatedColumns(), conditions.size());
+        PartitionUpdate.Builder updateBuilder = new PartitionUpdate.Builder(metadata, key, updatedColumns(), conditions.size());
         for (RowUpdate upd : updates)
-            upd.applyUpdates(current, update);
+            upd.applyUpdates(current, updateBuilder);
         for (RangeDeletion upd : rangeDeletions)
-            upd.applyUpdates(current, update);
+            upd.applyUpdates(current, updateBuilder);
 
-        Keyspace.openAndGetStore(metadata).indexManager.validate(update);
-        return update;
+        PartitionUpdate partitionUpdate = updateBuilder.build();
+        Keyspace.openAndGetStore(metadata).indexManager.validate(partitionUpdate);
+
+        return partitionUpdate;
     }
 
     /**
@@ -266,11 +268,11 @@ public class CQL3CasRequest implements CASRequest
             this.timestamp = timestamp;
         }
 
-        public void applyUpdates(FilteredPartition current, PartitionUpdate updates) throws InvalidRequestException
+        public void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder) throws InvalidRequestException
         {
-            Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null;
-            UpdateParameters params = new UpdateParameters(metadata, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map);
-            stmt.addUpdateForKey(updates, clustering, params);
+            Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.singletonMap(key, current) : null;
+            UpdateParameters params = new UpdateParameters(metadata, updateBuilder.columns(), options, timestamp, stmt.getTimeToLive(options), map);
+            stmt.addUpdateForKey(updateBuilder, clustering, params);
         }
     }
 
@@ -289,12 +291,12 @@ public class CQL3CasRequest implements CASRequest
             this.timestamp = timestamp;
         }
 
-        public void applyUpdates(FilteredPartition current, PartitionUpdate updates) throws InvalidRequestException
+        public void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder) throws InvalidRequestException
         {
             // No slice statements currently require a read, but this maintains consistency with RowUpdate, and future proofs us
             Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null;
-            UpdateParameters params = new UpdateParameters(metadata, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map);
-            stmt.addUpdateForKey(updates, slice, params);
+            UpdateParameters params = new UpdateParameters(metadata, updateBuilder.columns(), options, timestamp, stmt.getTimeToLive(options), map);
+            stmt.addUpdateForKey(updateBuilder, slice, params);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index e880bf8..8f3349e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -50,7 +50,7 @@ public class DeleteStatement extends ModificationStatement
     }
 
     @Override
-    public void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params)
+    public void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering clustering, UpdateParameters params)
     throws InvalidRequestException
     {
         TableMetadata metadata = metadata();
@@ -63,19 +63,19 @@ public class DeleteStatement extends ModificationStatement
             // We're not deleting any specific columns so it's either a full partition deletion ....
             if (clustering.size() == 0)
             {
-                update.addPartitionDeletion(params.deletionTime());
+                updateBuilder.addPartitionDeletion(params.deletionTime());
             }
             // ... or a row deletion ...
             else if (clustering.size() == metadata.clusteringColumns().size())
             {
                 params.newRow(clustering);
                 params.addRowDeletion();
-                update.add(params.buildRow());
+                updateBuilder.add(params.buildRow());
             }
             // ... or a range of rows deletion.
             else
             {
-                update.add(params.makeRangeTombstone(metadata.comparator, clustering));
+                updateBuilder.add(params.makeRangeTombstone(metadata.comparator, clustering));
             }
         }
         else
@@ -91,22 +91,22 @@ public class DeleteStatement extends ModificationStatement
                 params.newRow(clustering);
 
                 for (Operation op : regularDeletions)
-                    op.execute(update.partitionKey(), params);
-                update.add(params.buildRow());
+                    op.execute(updateBuilder.partitionKey(), params);
+                updateBuilder.add(params.buildRow());
             }
 
             if (!staticDeletions.isEmpty())
             {
                 params.newRow(Clustering.STATIC_CLUSTERING);
                 for (Operation op : staticDeletions)
-                    op.execute(update.partitionKey(), params);
-                update.add(params.buildRow());
+                    op.execute(updateBuilder.partitionKey(), params);
+                updateBuilder.add(params.buildRow());
             }
         }
     }
 
     @Override
-    public void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params)
+    public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateParameters params)
     {
         List<Operation> regularDeletions = getRegularOperations();
         List<Operation> staticDeletions = getStaticOperations();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index decb99f..f0cfd0d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -174,9 +174,9 @@ public abstract class ModificationStatement implements CQLStatement
         return restrictions;
     }
 
-    public abstract void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params);
+    public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering clustering, UpdateParameters params);
 
-    public abstract void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params);
+    public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Slice slice, UpdateParameters params);
 
     public int getBoundTerms()
     {
@@ -643,10 +643,8 @@ public abstract class ModificationStatement implements CQLStatement
      */
     private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, long queryStartNanoTime)
     {
-        UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(metadata.id, updatedColumns), 1);
+        UpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, 1);
         addUpdates(collector, options, local, now, queryStartNanoTime);
-        collector.validateIndexedColumns();
-
         return collector.toMutations();
     }
 
@@ -678,10 +676,10 @@ public abstract class ModificationStatement implements CQLStatement
                 Validation.validateKey(metadata(), key);
                 DecoratedKey dk = metadata().partitioner.decorateKey(key);
 
-                PartitionUpdate upd = collector.getPartitionUpdate(metadata(), dk, options.getConsistency());
+                PartitionUpdate.Builder updateBuilder = collector.getPartitionUpdateBuilder(metadata(), dk, options.getConsistency());
 
                 for (Slice slice : slices)
-                    addUpdateForKey(upd, slice, params);
+                    addUpdateForKey(updateBuilder, slice, params);
             }
         }
         else
@@ -699,25 +697,24 @@ public abstract class ModificationStatement implements CQLStatement
                 Validation.validateKey(metadata(), key);
                 DecoratedKey dk = metadata().partitioner.decorateKey(key);
 
-                PartitionUpdate upd = collector.getPartitionUpdate(metadata, dk, options.getConsistency());
+                PartitionUpdate.Builder updateBuilder = collector.getPartitionUpdateBuilder(metadata(), dk, options.getConsistency());
 
                 if (!restrictions.hasClusteringColumnsRestrictions())
                 {
-                    addUpdateForKey(upd, Clustering.EMPTY, params);
+                    addUpdateForKey(updateBuilder, Clustering.EMPTY, params);
                 }
                 else
                 {
                     for (Clustering clustering : clusterings)
                     {
-                       for (ByteBuffer c : clustering.getRawValues())
-                       {
-                           if (c != null && c.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
-                               throw new InvalidRequestException(String.format("Key length of %d is longer than maximum of %d",
-                                                                               clustering.dataSize(),
-                                                                               FBUtilities.MAX_UNSIGNED_SHORT));
-                       }
-
-                        addUpdateForKey(upd, clustering, params);
+                        for (ByteBuffer c : clustering.getRawValues())
+                        {
+                            if (c != null && c.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+                                throw new InvalidRequestException(String.format("Key length of %d is longer than maximum of %d",
+                                                                                clustering.dataSize(),
+                                                                                FBUtilities.MAX_UNSIGNED_SHORT));
+                        }
+                        addUpdateForKey(updateBuilder, clustering, params);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
new file mode 100644
index 0000000..9eaf897
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * Utility class to collect updates.
+ */
+final class SingleTableUpdatesCollector implements UpdatesCollector
+{
+    /**
+     * the table to be updated
+     */
+    private final TableMetadata metadata;
+
+    /**
+     * the columns to update
+     */
+    private final RegularAndStaticColumns updatedColumns;
+
+    /**
+     * The estimated number of updated row.
+     */
+    private final int updatedRows;
+
+    /**
+     * the partition update builders per key
+     */
+    private final Map<ByteBuffer, PartitionUpdate.Builder> puBuilders = new HashMap<>();
+
+    /**
+     * if it is a counter table, we will set this
+     */
+    private ConsistencyLevel counterConsistencyLevel = null;
+
+    SingleTableUpdatesCollector(TableMetadata metadata, RegularAndStaticColumns updatedColumns, int updatedRows)
+    {
+        this.metadata = metadata;
+        this.updatedColumns = updatedColumns;
+        this.updatedRows = updatedRows;
+    }
+
+    public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
+    {
+        if (metadata.isCounter())
+            counterConsistencyLevel = consistency;
+        return puBuilders.computeIfAbsent(dk.getKey(), (k) -> new PartitionUpdate.Builder(metadata, dk, updatedColumns, updatedRows));
+    }
+
+    /**
+     * Returns a collection containing all the mutations.
+     * @return a collection containing all the mutations.
+     */
+    public Collection<IMutation> toMutations()
+    {
+        List<IMutation> ms = new ArrayList<>();
+        for (PartitionUpdate.Builder builder : puBuilders.values())
+        {
+            IMutation mutation = null;
+
+            if (metadata.isCounter())
+                mutation = new CounterMutation(new Mutation(builder.build()), counterConsistencyLevel);
+            else
+                mutation = new Mutation(builder.build());
+            mutation.validateIndexedColumns();
+            ms.add(mutation);
+        }
+
+        return ms;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 7a2a1ba..a373ba1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -63,7 +63,7 @@ public class UpdateStatement extends ModificationStatement
     }
 
     @Override
-    public void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params)
+    public void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering clustering, UpdateParameters params)
     {
         if (updatesRegularRows())
         {
@@ -90,22 +90,22 @@ public class UpdateStatement extends ModificationStatement
             }
 
             for (Operation op : updates)
-                op.execute(update.partitionKey(), params);
+                op.execute(updateBuilder.partitionKey(), params);
 
-            update.add(params.buildRow());
+            updateBuilder.add(params.buildRow());
         }
 
         if (updatesStaticRow())
         {
             params.newRow(Clustering.STATIC_CLUSTERING);
             for (Operation op : getStaticOperations())
-                op.execute(update.partitionKey(), params);
-            update.add(params.buildRow());
+                op.execute(updateBuilder.partitionKey(), params);
+            updateBuilder.add(params.buildRow());
         }
     }
 
     @Override
-    public void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params)
+    public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateParameters params)
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
index f16c619..30db7ca 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
@@ -15,127 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.cql3.statements;
 
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collection;
 
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.TableMetadata;
 
-/**
- * Utility class to collect updates.
- *
- * <p>In a batch statement we don't want to recreate mutations every time as this is particularly inefficient when
- * applying multiple batch to the same partition (see #6737). </p>
- *
- */
-final class UpdatesCollector
+public interface UpdatesCollector
 {
-    /**
-     * The columns that will be updated for each table (keyed by the table ID).
-     */
-    private final Map<TableId, RegularAndStaticColumns> updatedColumns;
-
-    /**
-     * The estimated number of updated row.
-     */
-    private final int updatedRows;
-
-    /**
-     * The mutations per keyspace.
-     */
-    private final Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
-
-    public UpdatesCollector(Map<TableId, RegularAndStaticColumns> updatedColumns, int updatedRows)
-    {
-        super();
-        this.updatedColumns = updatedColumns;
-        this.updatedRows = updatedRows;
-    }
-
-    /**
-     * Gets the <code>PartitionUpdate</code> for the specified column family and key. If the update does not
-     * exist it will be created.
-     *
-     * @param metadata the column family meta data
-     * @param dk the partition key
-     * @param consistency the consistency level
-     * @return the <code>PartitionUpdate</code> for the specified column family and key
-     */
-    public PartitionUpdate getPartitionUpdate(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
-    {
-        Mutation mut = getMutation(metadata, dk, consistency);
-        PartitionUpdate upd = mut.get(metadata);
-        if (upd == null)
-        {
-            RegularAndStaticColumns columns = updatedColumns.get(metadata.id);
-            assert columns != null;
-            upd = new PartitionUpdate(metadata, dk, columns, updatedRows);
-            mut.add(upd);
-        }
-        return upd;
-    }
-
-    /**
-     * Check all partition updates contain only valid values for any
-     * indexed columns.
-     */
-    public void validateIndexedColumns()
-    {
-        for (Map<ByteBuffer, IMutation> perKsMutations : mutations.values())
-            for (IMutation mutation : perKsMutations.values())
-                for (PartitionUpdate update : mutation.getPartitionUpdates())
-                    Keyspace.openAndGetStore(update.metadata()).indexManager.validate(update);
-    }
-
-    private Mutation getMutation(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
-    {
-        String ksName = metadata.keyspace;
-        IMutation mutation = keyspaceMap(ksName).get(dk.getKey());
-        if (mutation == null)
-        {
-            Mutation mut = new Mutation(ksName, dk);
-            mutation = metadata.isCounter() ? new CounterMutation(mut, consistency) : mut;
-            keyspaceMap(ksName).put(dk.getKey(), mutation);
-            return mut;
-        }
-        return metadata.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation;
-    }
-
-    /**
-     * Returns a collection containing all the mutations.
-     * @return a collection containing all the mutations.
-     */
-    public Collection<IMutation> toMutations()
-    {
-        // The case where all statement where on the same keyspace is pretty common
-        if (mutations.size() == 1)
-            return mutations.values().iterator().next().values();
-
-        List<IMutation> ms = new ArrayList<>();
-        for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
-            ms.addAll(ksMap.values());
-
-        return ms;
-    }
-
-    /**
-     * Returns the key-mutation mappings for the specified keyspace.
-     *
-     * @param ksName the keyspace name
-     * @return the key-mutation mappings for the specified keyspace.
-     */
-    private Map<ByteBuffer, IMutation> keyspaceMap(String ksName)
-    {
-        Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
-        if (ksMap == null)
-        {
-            ksMap = new HashMap<>();
-            mutations.put(ksName, ksMap);
-        }
-        return ksMap;
-    }
+    PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency);
+    Collection<IMutation> toMutations();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 0f1ad06..d04ddd8 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.Lock;
 
 import com.google.common.base.Function;
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
@@ -112,7 +113,7 @@ public class CounterMutation implements IMutation
      */
     public Mutation applyCounterMutation() throws WriteTimeoutException
     {
-        Mutation result = new Mutation(getKeyspaceName(), key());
+        Mutation.PartitionUpdateCollector resultBuilder = new Mutation.PartitionUpdateCollector(getKeyspaceName(), key());
         Keyspace keyspace = Keyspace.open(getKeyspaceName());
 
         List<Lock> locks = new ArrayList<>();
@@ -121,7 +122,9 @@ public class CounterMutation implements IMutation
         {
             grabCounterLocks(keyspace, locks);
             for (PartitionUpdate upd : getPartitionUpdates())
-                result.add(processModifications(upd));
+                resultBuilder.add(processModifications(upd));
+
+            Mutation result = resultBuilder.build();
             result.apply();
             return result;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 3d4b1b2..9eaf19b 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -32,6 +32,12 @@ public interface IMutation
     public String toString(boolean shallow);
     public Collection<PartitionUpdate> getPartitionUpdates();
 
+    public default void validateIndexedColumns()
+    {
+        for (PartitionUpdate pu : getPartitionUpdates())
+            pu.validateIndexedColumns();
+    }
+
     /**
      * Computes the total data size of the specified mutations.
      * @param mutations the mutations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 062e1fe..a6a920c 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -22,6 +22,8 @@ import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -37,8 +39,6 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-// TODO convert this to a Builder pattern instead of encouraging M.add directly,
-// which is less-efficient since we have to keep a mutable HashMap around
 public class Mutation implements IMutation
 {
     public static final MutationSerializer serializer = new MutationSerializer();
@@ -52,37 +52,31 @@ public class Mutation implements IMutation
 
     private final DecoratedKey key;
     // map of column family id to mutations for that column family.
-    private final Map<TableId, PartitionUpdate> modifications;
+    private final ImmutableMap<TableId, PartitionUpdate> modifications;
 
-    // Time at which this mutation was instantiated
-    public final long createdAt = System.currentTimeMillis();
+    // Time at which this mutation or the builder that built it was instantiated
+    final long createdAt;
     // keep track of when mutation has started waiting for a MV partition lock
-    public final AtomicLong viewLockAcquireStart = new AtomicLong(0);
+    final AtomicLong viewLockAcquireStart = new AtomicLong(0);
 
-    private boolean cdcEnabled = false;
-
-    public Mutation(String keyspaceName, DecoratedKey key)
-    {
-        this(keyspaceName, key, new HashMap<>());
-    }
+    private final boolean cdcEnabled;
 
     public Mutation(PartitionUpdate update)
     {
-        this(update.metadata().keyspace, update.partitionKey(), Collections.singletonMap(update.metadata().id, update));
+        this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update), System.currentTimeMillis());
     }
 
-    protected Mutation(String keyspaceName, DecoratedKey key, Map<TableId, PartitionUpdate> modifications)
+    public Mutation(String keyspaceName, DecoratedKey key, ImmutableMap<TableId, PartitionUpdate> modifications, long createdAt)
     {
         this.keyspaceName = keyspaceName;
         this.key = key;
         this.modifications = modifications;
-        for (PartitionUpdate pu : modifications.values())
-            cdcEnabled |= pu.metadata().params.cdc;
-    }
 
-    public Mutation copy()
-    {
-        return new Mutation(keyspaceName, key, new HashMap<>(modifications));
+        boolean cdc = false;
+        for (PartitionUpdate pu : modifications.values())
+            cdc |= pu.metadata().params.cdc;
+        this.cdcEnabled = cdc;
+        this.createdAt = createdAt;
     }
 
     public Mutation without(Set<TableId> tableIds)
@@ -90,15 +84,16 @@ public class Mutation implements IMutation
         if (tableIds.isEmpty())
             return this;
 
-        Mutation copy = copy();
-
-        copy.modifications.keySet().removeAll(tableIds);
-
-        copy.cdcEnabled = false;
-        for (PartitionUpdate pu : modifications.values())
-            copy.cdcEnabled |= pu.metadata().params.cdc;
+        ImmutableMap.Builder<TableId, PartitionUpdate> builder = new ImmutableMap.Builder<>();
+        for (Map.Entry<TableId, PartitionUpdate> update : modifications.entrySet())
+        {
+            if (!tableIds.contains(update.getKey()))
+            {
+                builder.put(update);
+            }
+        }
 
-        return copy;
+        return new Mutation(keyspaceName, key, builder.build(), createdAt);
     }
 
     public Mutation without(TableId tableId)
@@ -121,7 +116,7 @@ public class Mutation implements IMutation
         return key;
     }
 
-    public Collection<PartitionUpdate> getPartitionUpdates()
+    public ImmutableCollection<PartitionUpdate> getPartitionUpdates()
     {
         return modifications.values();
     }
@@ -131,33 +126,6 @@ public class Mutation implements IMutation
         return table == null ? null : modifications.get(table.id);
     }
 
-    /**
-     * Adds PartitionUpdate to the local set of modifications.
-     * Assumes no updates for the Table this PartitionUpdate impacts.
-     *
-     * @param update PartitionUpdate to append to Modifications list
-     * @return Mutation this mutation
-     * @throws IllegalArgumentException If PartitionUpdate for duplicate table is passed as argument
-     */
-    public Mutation add(PartitionUpdate update)
-    {
-        assert update != null;
-        assert update.partitionKey().getPartitioner() == key.getPartitioner();
-
-        cdcEnabled |= update.metadata().params.cdc;
-
-        PartitionUpdate prev = modifications.put(update.metadata().id, update);
-        if (prev != null)
-            // developer error
-            throw new IllegalArgumentException("Table " + update.metadata().name + " already has modifications in this mutation: " + prev);
-        return this;
-    }
-
-    public PartitionUpdate get(TableMetadata metadata)
-    {
-        return modifications.get(metadata.id);
-    }
-
     public boolean isEmpty()
     {
         return modifications.isEmpty();
@@ -196,7 +164,7 @@ public class Mutation implements IMutation
         }
 
         List<PartitionUpdate> updates = new ArrayList<>(mutations.size());
-        Map<TableId, PartitionUpdate> modifications = new HashMap<>(updatedTables.size());
+        ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
         for (TableId table : updatedTables)
         {
             for (Mutation mutation : mutations)
@@ -212,7 +180,7 @@ public class Mutation implements IMutation
             modifications.put(table, updates.size() == 1 ? updates.get(0) : PartitionUpdate.merge(updates));
             updates.clear();
         }
-        return new Mutation(ks, key, modifications);
+        return new Mutation(ks, key, modifications.build(), System.currentTimeMillis());
     }
 
     public CompletableFuture<?> applyFuture()
@@ -389,7 +357,7 @@ public class Mutation implements IMutation
             if (size == 1)
                 return new Mutation(update);
 
-            Map<TableId, PartitionUpdate> modifications = new HashMap<>(size);
+            ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
             DecoratedKey dk = update.partitionKey();
 
             modifications.put(update.metadata().id, update);
@@ -398,8 +366,7 @@ public class Mutation implements IMutation
                 update = PartitionUpdate.serializer.deserialize(in, version, flag);
                 modifications.put(update.metadata().id, update);
             }
-
-            return new Mutation(update.metadata().keyspace, dk, modifications);
+            return new Mutation(update.metadata().keyspace, dk, modifications.build(), System.currentTimeMillis());
         }
 
         public Mutation deserialize(DataInputPlus in, int version) throws IOException
@@ -416,4 +383,52 @@ public class Mutation implements IMutation
             return size;
         }
     }
+
+    /**
+     * Collects finalized partition updates
+     */
+    public static class PartitionUpdateCollector
+    {
+        private final ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
+        private final String keyspaceName;
+        private final DecoratedKey key;
+        private final long createdAt = System.currentTimeMillis();
+        private boolean empty = true;
+
+        public PartitionUpdateCollector(String keyspaceName, DecoratedKey key)
+        {
+            this.keyspaceName = keyspaceName;
+            this.key = key;
+        }
+
+        public PartitionUpdateCollector add(PartitionUpdate partitionUpdate)
+        {
+            assert partitionUpdate != null;
+            assert partitionUpdate.partitionKey().getPartitioner() == key.getPartitioner();
+            // note that ImmutableMap.Builder only allows put:ing the same key once, it will fail during build() below otherwise
+            modifications.put(partitionUpdate.metadata().id, partitionUpdate);
+            empty = false;
+            return this;
+        }
+
+        public DecoratedKey key()
+        {
+            return key;
+        }
+
+        public String getKeyspaceName()
+        {
+            return keyspaceName;
+        }
+
+        public boolean isEmpty()
+        {
+            return empty;
+        }
+
+        public Mutation build()
+        {
+            return new Mutation(keyspaceName, key, modifications.build(), createdAt);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/SimpleBuilders.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SimpleBuilders.java b/src/java/org/apache/cassandra/db/SimpleBuilders.java
index a212834..3520d97 100644
--- a/src/java/org/apache/cassandra/db/SimpleBuilders.java
+++ b/src/java/org/apache/cassandra/db/SimpleBuilders.java
@@ -146,10 +146,10 @@ public abstract class SimpleBuilders
             if (updateBuilders.size() == 1)
                 return new Mutation(updateBuilders.values().iterator().next().build());
 
-            Mutation mutation = new Mutation(keyspaceName, key);
+            Mutation.PartitionUpdateCollector mutationBuilder = new Mutation.PartitionUpdateCollector(keyspaceName, key);
             for (PartitionUpdateBuilder builder : updateBuilders.values())
-                mutation.add(builder.build());
-            return mutation;
+                mutationBuilder.add(builder.build());
+            return mutationBuilder.build();
         }
     }
 
@@ -159,6 +159,7 @@ public abstract class SimpleBuilders
         private final DecoratedKey key;
         private final Map<Clustering, RowBuilder> rowBuilders = new HashMap<>();
         private List<RTBuilder> rangeBuilders = null; // We use that rarely, so create lazily
+        private List<RangeTombstone> rangeTombstones = null;
 
         private DeletionTime partitionDeletion = DeletionTime.LIVE;
 
@@ -204,6 +205,14 @@ public abstract class SimpleBuilders
             return builder;
         }
 
+        public PartitionUpdate.SimpleBuilder addRangeTombstone(RangeTombstone rt)
+        {
+            if (rangeTombstones == null)
+                rangeTombstones = new ArrayList<>();
+            rangeTombstones.add(rt);
+            return this;
+        }
+
         public PartitionUpdate build()
         {
             // Collect all updated columns
@@ -213,7 +222,7 @@ public abstract class SimpleBuilders
 
             // Note that rowBuilders.size() could include the static column so could be 1 off the really need capacity
             // of the final PartitionUpdate, but as that's just a sizing hint, we'll live.
-            PartitionUpdate update = new PartitionUpdate(metadata, key, columns.build(), rowBuilders.size());
+            PartitionUpdate.Builder update = new PartitionUpdate.Builder(metadata, key, columns.build(), rowBuilders.size());
 
             update.addPartitionDeletion(partitionDeletion);
             if (rangeBuilders != null)
@@ -222,10 +231,16 @@ public abstract class SimpleBuilders
                     update.add(builder.build());
             }
 
+            if (rangeTombstones != null)
+            {
+                for (RangeTombstone rt : rangeTombstones)
+                    update.add(rt);
+            }
+
             for (RowBuilder builder : rowBuilders.values())
                 update.add(builder.build());
 
-            return update;
+            return update.build();
         }
 
         public Mutation buildAsMutation()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9da0f6b..4469384 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1025,7 +1025,7 @@ public final class SystemKeyspace
         UntypedResultSet.Row row = results.one();
 
         Commit promised = row.has("in_progress_ballot")
-                        ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate(metadata, key, metadata.regularAndStaticColumns(), 1))
+                        ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate.Builder(metadata, key, metadata.regularAndStaticColumns(), 1).build())
                         : Commit.emptyCommit(key, metadata);
         // either we have both a recently accepted ballot and update or we have neither
         Commit accepted = row.has("proposal_version") && row.has("proposal")
@@ -1135,9 +1135,7 @@ public final class SystemKeyspace
     public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates)
     {
         long timestamp = FBUtilities.timestampMicros();
-        PartitionUpdate update = new PartitionUpdate(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), estimates.size());
-        Mutation mutation = new Mutation(update);
-
+        PartitionUpdate.Builder update = new PartitionUpdate.Builder(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), estimates.size());
         // delete all previous values with a single range tombstone.
         int nowInSec = FBUtilities.nowInSeconds();
         update.add(new RangeTombstone(Slice.make(SizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec)));
@@ -1153,8 +1151,7 @@ public final class SystemKeyspace
                            .add("mean_partition_size", values.right)
                            .build());
         }
-
-        mutation.apply();
+        new Mutation(update.build()).apply();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index ce185b6..2947222 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -26,7 +26,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
-import com.google.common.collect.*;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
+
 import org.apache.commons.lang3.StringUtils;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
@@ -249,7 +253,7 @@ public class CommitLogReplayer implements CommitLogReadHandler
                     //    b) have already been flushed,
                     // or c) are part of a cf that was dropped.
                     // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
-                    Mutation newMutation = null;
+                    Mutation.PartitionUpdateCollector newPUCollector = null;
                     for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation))
                     {
                         if (Schema.instance.getTableMetadata(update.metadata().id) == null)
@@ -259,17 +263,17 @@ public class CommitLogReplayer implements CommitLogReadHandler
                         // if it is the last known segment, if we are after the commit log segment position
                         if (commitLogReplayer.shouldReplay(update.metadata().id, new CommitLogPosition(segmentId, entryLocation)))
                         {
-                            if (newMutation == null)
-                                newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
-                            newMutation.add(update);
+                            if (newPUCollector == null)
+                                newPUCollector = new Mutation.PartitionUpdateCollector(mutation.getKeyspaceName(), mutation.key());
+                            newPUCollector.add(update);
                             commitLogReplayer.replayedCount.incrementAndGet();
                         }
                     }
-                    if (newMutation != null)
+                    if (newPUCollector != null)
                     {
-                        assert !newMutation.isEmpty();
+                        assert !newPUCollector.isEmpty();
 
-                        Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false);
+                        Keyspace.open(newPUCollector.getKeyspaceName()).apply(newPUCollector.build(), false, true, false);
                         commitLogReplayer.keyspacesReplayed.add(keyspace);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 7a0cefe..a549458 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.partitions;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
@@ -60,37 +61,11 @@ public class PartitionUpdate extends AbstractBTreePartition
 
     public static final PartitionUpdateSerializer serializer = new PartitionUpdateSerializer();
 
-    private final int createdAtInSec = FBUtilities.nowInSeconds();
-
-    // Records whether this update is "built", i.e. if the build() method has been called, which
-    // happens when the update is read. Further writing is then rejected though a manual call
-    // to allowNewUpdates() allow new writes. We could make that more implicit but only triggers
-    // really requires that so we keep it simple for now).
-    private volatile boolean isBuilt;
-    private boolean canReOpen = true;
-
-    private Holder holder;
-    private BTree.Builder<Row> rowBuilder;
-    private MutableDeletionInfo deletionInfo;
-
-    private final boolean canHaveShadowedData;
-
+    private final Holder holder;
+    private final DeletionInfo deletionInfo;
     private final TableMetadata metadata;
 
-    private PartitionUpdate(TableMetadata metadata,
-                            DecoratedKey key,
-                            RegularAndStaticColumns columns,
-                            MutableDeletionInfo deletionInfo,
-                            int initialRowCapacity,
-                            boolean canHaveShadowedData)
-    {
-        super(key);
-        this.metadata = metadata;
-        this.deletionInfo = deletionInfo;
-        this.holder = new Holder(columns, BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
-        this.canHaveShadowedData = canHaveShadowedData;
-        rowBuilder = builder(initialRowCapacity);
-    }
+    private final boolean canHaveShadowedData;
 
     private PartitionUpdate(TableMetadata metadata,
                             DecoratedKey key,
@@ -102,29 +77,9 @@ public class PartitionUpdate extends AbstractBTreePartition
         this.metadata = metadata;
         this.holder = holder;
         this.deletionInfo = deletionInfo;
-        this.isBuilt = true;
         this.canHaveShadowedData = canHaveShadowedData;
     }
 
-    public PartitionUpdate(TableMetadata metadata,
-                           DecoratedKey key,
-                           RegularAndStaticColumns columns,
-                           int initialRowCapacity)
-    {
-        this(metadata, key, columns, MutableDeletionInfo.live(), initialRowCapacity, true);
-    }
-
-    public PartitionUpdate(TableMetadata metadata,
-                           ByteBuffer key,
-                           RegularAndStaticColumns columns,
-                           int initialRowCapacity)
-    {
-        this(metadata,
-             metadata.partitioner.decorateKey(key),
-             columns,
-             initialRowCapacity);
-    }
-
     /**
      * Creates a empty immutable partition update.
      *
@@ -329,30 +284,6 @@ public class PartitionUpdate extends AbstractBTreePartition
     }
 
     /**
-     * Modify this update to set every timestamp for live data to {@code newTimestamp} and
-     * every deletion timestamp to {@code newTimestamp - 1}.
-     *
-     * There is no reason to use that expect on the Paxos code path, where we need ensure that
-     * anything inserted use the ballot timestamp (to respect the order of update decided by
-     * the Paxos algorithm). We use {@code newTimestamp - 1} for deletions because tombstones
-     * always win on timestamp equality and we don't want to delete our own insertions
-     * (typically, when we overwrite a collection, we first set a complex deletion to delete the
-     * previous collection before adding new elements. If we were to set that complex deletion
-     * to the same timestamp that the new elements, it would delete those elements). And since
-     * tombstones always wins on timestamp equality, using -1 guarantees our deletion will still
-     * delete anything from a previous update.
-     */
-    public void updateAllTimestamp(long newTimestamp)
-    {
-        Holder holder = holder();
-        deletionInfo.updateAllTimestamp(newTimestamp - 1);
-        Object[] tree = BTree.<Row>transformAndFilter(holder.tree, (x) -> x.updateAllTimestamp(newTimestamp));
-        Row staticRow = holder.staticRow.updateAllTimestamp(newTimestamp);
-        EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.<Row>iterator(tree), deletionInfo);
-        this.holder = new Holder(holder.columns, tree, deletionInfo, staticRow, newStats);
-    }
-
-    /**
      * The number of "operations" contained in the update.
      * <p>
      * This is used by {@code Memtable} to approximate how much work this update does. In practice, this
@@ -401,7 +332,6 @@ public class PartitionUpdate extends AbstractBTreePartition
 
     protected Holder holder()
     {
-        maybeBuild();
         return holder;
     }
 
@@ -411,50 +341,6 @@ public class PartitionUpdate extends AbstractBTreePartition
     }
 
     /**
-     * If a partition update has been read (and is thus unmodifiable), a call to this method
-     * makes the update modifiable again.
-     * <p>
-     * Please note that calling this method won't result in optimal behavior in the sense that
-     * even if very little is added to the update after this call, the whole update will be sorted
-     * again on read. This should thus be used sparingly (and if it turns that we end up using
-     * this often, we should consider optimizing the behavior).
-     */
-    public synchronized void allowNewUpdates()
-    {
-        if (!canReOpen)
-            throw new IllegalStateException("You cannot do more updates on collectCounterMarks has been called");
-
-        // This is synchronized to make extra sure things work properly even if this is
-        // called concurrently with sort() (which should be avoided in the first place, but
-        // better safe than sorry).
-        isBuilt = false;
-        if (rowBuilder == null)
-            rowBuilder = builder(16);
-    }
-
-    private BTree.Builder<Row> builder(int initialCapacity)
-    {
-        return BTree.<Row>builder(metadata().comparator, initialCapacity)
-                    .setQuickResolver((a, b) ->
-                                      Rows.merge(a, b, createdAtInSec));
-    }
-
-    /**
-     * Returns an iterator that iterates over the rows of this update in clustering order.
-     * <p>
-     * Note that this might trigger a sorting of the update, and as such the update will not
-     * be modifiable anymore after this call.
-     *
-     * @return an iterator over the rows of this update.
-     */
-    @Override
-    public Iterator<Row> iterator()
-    {
-        maybeBuild();
-        return super.iterator();
-    }
-
-    /**
      * Validates the data contained in this update.
      *
      * @throws org.apache.cassandra.serializers.MarshalException if some of the data contained in this update is corrupted.
@@ -476,8 +362,6 @@ public class PartitionUpdate extends AbstractBTreePartition
      */
     public long maxTimestamp()
     {
-        maybeBuild();
-
         long maxTimestamp = deletionInfo.maxTimestamp();
         for (Row row : this)
         {
@@ -509,11 +393,8 @@ public class PartitionUpdate extends AbstractBTreePartition
     public List<CounterMark> collectCounterMarks()
     {
         assert metadata().isCounter();
-        maybeBuild();
         // We will take aliases on the rows of this update, and update them in-place. So we should be sure the
         // update is now immutable for all intent and purposes.
-        canReOpen = false;
-
         List<CounterMark> marks = new ArrayList<>();
         addMarksForRow(staticRow(), marks);
         for (Row row : this)
@@ -521,7 +402,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         return marks;
     }
 
-    private void addMarksForRow(Row row, List<CounterMark> marks)
+    private static void addMarksForRow(Row row, List<CounterMark> marks)
     {
         for (Cell cell : row.cells())
         {
@@ -530,109 +411,6 @@ public class PartitionUpdate extends AbstractBTreePartition
         }
     }
 
-    private void assertNotBuilt()
-    {
-        if (isBuilt)
-            throw new IllegalStateException("An update should not be written again once it has been read");
-    }
-
-    public void addPartitionDeletion(DeletionTime deletionTime)
-    {
-        assertNotBuilt();
-        deletionInfo.add(deletionTime);
-    }
-
-    public void add(RangeTombstone range)
-    {
-        assertNotBuilt();
-        deletionInfo.add(range, metadata().comparator);
-    }
-
-    /**
-     * Adds a row to this update.
-     *
-     * There is no particular assumption made on the order of row added to a partition update. It is further
-     * allowed to add the same row (more precisely, multiple row objects for the same clustering).
-     *
-     * Note however that the columns contained in the added row must be a subset of the columns used when
-     * creating this update.
-     *
-     * @param row the row to add.
-     */
-    public void add(Row row)
-    {
-        if (row.isEmpty())
-            return;
-
-        assertNotBuilt();
-
-        if (row.isStatic())
-        {
-            // this assert is expensive, and possibly of limited value; we should consider removing it
-            // or introducing a new class of assertions for test purposes
-            assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns();
-            Row staticRow = holder.staticRow.isEmpty()
-                      ? row
-                      : Rows.merge(holder.staticRow, row, createdAtInSec);
-            holder = new Holder(holder.columns, holder.tree, holder.deletionInfo, staticRow, holder.stats);
-        }
-        else
-        {
-            // this assert is expensive, and possibly of limited value; we should consider removing it
-            // or introducing a new class of assertions for test purposes
-            assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns();
-            rowBuilder.add(row);
-        }
-    }
-
-    private void maybeBuild()
-    {
-        if (isBuilt)
-            return;
-
-        build();
-    }
-
-    private synchronized void build()
-    {
-        if (isBuilt)
-            return;
-
-        Holder holder = this.holder;
-        Object[] cur = holder.tree;
-        Object[] add = rowBuilder.build();
-        Object[] merged = BTree.<Row>merge(cur, add, metadata().comparator,
-                                           UpdateFunction.Simple.of((a, b) -> Rows.merge(a, b, createdAtInSec)));
-
-        assert deletionInfo == holder.deletionInfo;
-        EncodingStats newStats = EncodingStats.Collector.collect(holder.staticRow, BTree.<Row>iterator(merged), deletionInfo);
-
-        this.holder = new Holder(holder.columns, merged, holder.deletionInfo, holder.staticRow, newStats);
-        rowBuilder = null;
-        isBuilt = true;
-    }
-
-    @Override
-    public String toString()
-    {
-        if (isBuilt)
-            return super.toString();
-
-        // We intentionally override AbstractBTreePartition#toString() to avoid iterating over the rows in the
-        // partition, which can result in build() being triggered and lead to errors if the PartitionUpdate is later
-        // modified.
-
-        StringBuilder sb = new StringBuilder();
-        sb.append(String.format("[%s] key=%s columns=%s",
-                                metadata.toString(),
-                                metadata.partitionKeyType.getString(partitionKey().getKey()),
-                                columns()));
-
-        sb.append("\n    deletionInfo=").append(deletionInfo);
-        sb.append(" (not built)");
-        return sb.toString();
-    }
-
     /**
      * Creates a new simple partition update builder.
      *
@@ -647,6 +425,11 @@ public class PartitionUpdate extends AbstractBTreePartition
         return new SimpleBuilders.PartitionUpdateBuilder(metadata, partitionKeyValues);
     }
 
+    public void validateIndexedColumns()
+    {
+        Keyspace.openAndGetStore(metadata()).indexManager.validate(this);
+    }
+
     /**
      * Interface for building partition updates geared towards human.
      * <p>
@@ -712,6 +495,13 @@ public class PartitionUpdate extends AbstractBTreePartition
         public RangeTombstoneBuilder addRangeTombstone();
 
         /**
+         * Adds a new range tombstone to this update
+         *
+         * @return this builder
+         */
+        public SimpleBuilder addRangeTombstone(RangeTombstone rt);
+
+        /**
          * Build the update represented by this builder.
          *
          * @return the built update.
@@ -892,4 +682,208 @@ public class PartitionUpdate extends AbstractBTreePartition
             ((BTreeRow)row).setValue(column, path, value);
         }
     }
+
+    /**
+     * Builder for PartitionUpdates
+     *
+     * This class is not thread safe, but the PartitionUpdate it produces is (since it is immutable).
+     */
+    public static class Builder
+    {
+        private final TableMetadata metadata;
+        private final DecoratedKey key;
+        private final MutableDeletionInfo deletionInfo;
+        private final boolean canHaveShadowedData;
+        private Object[] tree = BTree.empty();
+        private final BTree.Builder<Row> rowBuilder;
+        private final int createdAtInSec = FBUtilities.nowInSeconds();
+        private Row staticRow = Rows.EMPTY_STATIC_ROW;
+        private final RegularAndStaticColumns columns;
+        private boolean isBuilt = false;
+
+        public Builder(TableMetadata metadata,
+                       DecoratedKey key,
+                       RegularAndStaticColumns columns,
+                       int initialRowCapacity,
+                       boolean canHaveShadowedData)
+        {
+            this(metadata, key, columns, initialRowCapacity, canHaveShadowedData, Rows.EMPTY_STATIC_ROW, MutableDeletionInfo.live(), BTree.empty());
+        }
+
+        private Builder(TableMetadata metadata,
+                       DecoratedKey key,
+                       RegularAndStaticColumns columns,
+                       int initialRowCapacity,
+                       boolean canHaveShadowedData,
+                       Holder holder)
+        {
+            this(metadata, key, columns, initialRowCapacity, canHaveShadowedData, holder.staticRow, holder.deletionInfo, holder.tree);
+        }
+
+        private Builder(TableMetadata metadata,
+                        DecoratedKey key,
+                        RegularAndStaticColumns columns,
+                        int initialRowCapacity,
+                        boolean canHaveShadowedData,
+                        Row staticRow,
+                        DeletionInfo deletionInfo,
+                        Object[] tree)
+        {
+            this.metadata = metadata;
+            this.key = key;
+            this.columns = columns;
+            this.rowBuilder = rowBuilder(initialRowCapacity);
+            this.canHaveShadowedData = canHaveShadowedData;
+            this.deletionInfo = deletionInfo.mutableCopy();
+            this.staticRow = staticRow;
+            this.tree = tree;
+        }
+
+        public Builder(TableMetadata metadata, DecoratedKey key, RegularAndStaticColumns columnDefinitions, int size)
+        {
+            this(metadata, key, columnDefinitions, size, true);
+        }
+
+        public Builder(PartitionUpdate base, int initialRowCapacity)
+        {
+            this(base.metadata, base.partitionKey, base.columns(), initialRowCapacity, base.canHaveShadowedData, base.holder);
+        }
+
+        public Builder(TableMetadata metadata,
+                        ByteBuffer key,
+                        RegularAndStaticColumns columns,
+                        int initialRowCapacity)
+        {
+            this(metadata, metadata.partitioner.decorateKey(key), columns, initialRowCapacity, true);
+        }
+
+        /**
+         * Adds a row to this update.
+         *
+         * There is no particular assumption made on the order of row added to a partition update. It is further
+         * allowed to add the same row (more precisely, multiple row objects for the same clustering).
+         *
+         * Note however that the columns contained in the added row must be a subset of the columns used when
+         * creating this update.
+         *
+         * @param row the row to add.
+         */
+        public void add(Row row)
+        {
+            if (row.isEmpty())
+                return;
+
+            if (row.isStatic())
+            {
+                // this assert is expensive, and possibly of limited value; we should consider removing it
+                // or introducing a new class of assertions for test purposes
+                assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns();
+                staticRow = staticRow.isEmpty()
+                            ? row
+                            : Rows.merge(staticRow, row, createdAtInSec);
+            }
+            else
+            {
+                // this assert is expensive, and possibly of limited value; we should consider removing it
+                // or introducing a new class of assertions for test purposes
+                assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns();
+                rowBuilder.add(row);
+            }
+        }
+
+        public void addPartitionDeletion(DeletionTime deletionTime)
+        {
+            deletionInfo.add(deletionTime);
+        }
+
+        public void add(RangeTombstone range)
+        {
+            deletionInfo.add(range, metadata.comparator);
+        }
+
+        public DecoratedKey partitionKey()
+        {
+            return key;
+        }
+
+        public TableMetadata metadata()
+        {
+            return metadata;
+        }
+
+        public PartitionUpdate build()
+        {
+            // assert that we are not calling build() several times
+            assert !isBuilt : "A PartitionUpdate.Builder should only get built once";
+            Object[] add = rowBuilder.build();
+            Object[] merged = BTree.<Row>merge(tree, add, metadata.comparator,
+                                               UpdateFunction.Simple.of((a, b) -> Rows.merge(a, b, createdAtInSec)));
+
+            EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.iterator(merged), deletionInfo);
+
+            isBuilt = true;
+            return new PartitionUpdate(metadata,
+                                       partitionKey(),
+                                       new Holder(columns,
+                                                  merged,
+                                                  deletionInfo,
+                                                  staticRow,
+                                                  newStats),
+                                       deletionInfo,
+                                       canHaveShadowedData);
+        }
+
+        public RegularAndStaticColumns columns()
+        {
+            return columns;
+        }
+
+        public DeletionTime partitionLevelDeletion()
+        {
+            return deletionInfo.getPartitionDeletion();
+        }
+
+        private BTree.Builder<Row> rowBuilder(int initialCapacity)
+        {
+            return BTree.<Row>builder(metadata.comparator, initialCapacity)
+                   .setQuickResolver((a, b) ->
+                                     Rows.merge(a, b, createdAtInSec));
+        }
+        /**
+         * Modify this update to set every timestamp for live data to {@code newTimestamp} and
+         * every deletion timestamp to {@code newTimestamp - 1}.
+         *
+         * There is no reason to use that expect on the Paxos code path, where we need ensure that
+         * anything inserted use the ballot timestamp (to respect the order of update decided by
+         * the Paxos algorithm). We use {@code newTimestamp - 1} for deletions because tombstones
+         * always win on timestamp equality and we don't want to delete our own insertions
+         * (typically, when we overwrite a collection, we first set a complex deletion to delete the
+         * previous collection before adding new elements. If we were to set that complex deletion
+         * to the same timestamp that the new elements, it would delete those elements). And since
+         * tombstones always wins on timestamp equality, using -1 guarantees our deletion will still
+         * delete anything from a previous update.
+         */
+        public Builder updateAllTimestamp(long newTimestamp)
+        {
+            deletionInfo.updateAllTimestamp(newTimestamp - 1);
+            tree = BTree.<Row>transformAndFilter(tree, (x) -> x.updateAllTimestamp(newTimestamp));
+            staticRow = this.staticRow.updateAllTimestamp(newTimestamp);
+            return this;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Builder{" +
+                   "metadata=" + metadata +
+                   ", key=" + key +
+                   ", deletionInfo=" + deletionInfo +
+                   ", canHaveShadowedData=" + canHaveShadowedData +
+                   ", createdAtInSec=" + createdAtInSec +
+                   ", staticRow=" + staticRow +
+                   ", columns=" + columns +
+                   ", isBuilt=" + isBuilt +
+                   '}';
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java
index 7a1373c..298fcfd 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -21,7 +21,9 @@ import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
@@ -510,23 +512,23 @@ public class TableViews extends AbstractCollection<View>
             return mutations;
         }
 
-        Map<DecoratedKey, Mutation> mutations = new HashMap<>();
+        Map<DecoratedKey, Mutation.PartitionUpdateCollector> mutations = new HashMap<>();
         for (ViewUpdateGenerator generator : generators)
         {
             for (PartitionUpdate update : generator.generateViewUpdates())
             {
                 DecoratedKey key = update.partitionKey();
-                Mutation mutation = mutations.get(key);
-                if (mutation == null)
+                Mutation.PartitionUpdateCollector collector = mutations.get(key);
+                if (collector == null)
                 {
-                    mutation = new Mutation(baseTableMetadata.keyspace, key);
-                    mutations.put(key, mutation);
+                    collector = new Mutation.PartitionUpdateCollector(baseTableMetadata.keyspace, key);
+                    mutations.put(key, collector);
                 }
-                mutation.add(update);
+                collector.add(update);
             }
             generator.clear();
         }
-        return mutations.values();
+        return mutations.values().stream().map(Mutation.PartitionUpdateCollector::build).collect(Collectors.toList());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
index 794a6b7..73ca240 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.view;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
@@ -53,7 +54,7 @@ public class ViewUpdateGenerator
     private final TableMetadata viewMetadata;
     private final boolean baseEnforceStrictLiveness;
 
-    private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>();
+    private final Map<DecoratedKey, PartitionUpdate.Builder> updates = new HashMap<>();
 
     // Reused internally to build a new entry
     private final ByteBuffer[] currentViewEntryPartitionKey;
@@ -143,7 +144,7 @@ public class ViewUpdateGenerator
      */
     public Collection<PartitionUpdate> generateViewUpdates()
     {
-        return updates.values();
+        return updates.values().stream().map(PartitionUpdate.Builder::build).collect(Collectors.toList());
     }
 
     /**
@@ -566,14 +567,13 @@ public class ViewUpdateGenerator
             return;
 
         DecoratedKey partitionKey = makeCurrentPartitionKey();
-        PartitionUpdate update = updates.get(partitionKey);
-        if (update == null)
-        {
-            // We can't really know which columns of the view will be updated nor how many row will be updated for this key
-            // so we rely on hopefully sane defaults.
-            update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.regularAndStaticColumns(), 4);
-            updates.put(partitionKey, update);
-        }
+        // We can't really know which columns of the view will be updated nor how many row will be updated for this key
+        // so we rely on hopefully sane defaults.
+        PartitionUpdate.Builder update = updates.computeIfAbsent(partitionKey,
+                                                                 k -> new PartitionUpdate.Builder(viewMetadata,
+                                                                                                  partitionKey,
+                                                                                                  viewMetadata.regularAndStaticColumns(),
+                                                                                                  4));
         update.add(row);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 1fa5d8e..044a00b 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -115,7 +115,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
         return maxGen;
     }
 
-    PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
+    PartitionUpdate.Builder getUpdateFor(ByteBuffer key) throws IOException
     {
         return getUpdateFor(metadata.get().partitioner.decorateKey(key));
     }
@@ -126,6 +126,6 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
      * @param key they partition key for which the returned update will be.
      * @return an update on partition {@code key} that is tied to this writer.
      */
-    abstract PartitionUpdate getUpdateFor(DecoratedKey key) throws IOException;
+    abstract PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException;
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index afb4461..369be12 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -68,16 +68,16 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         diskWriter.start();
     }
 
-    PartitionUpdate getUpdateFor(DecoratedKey key)
+    PartitionUpdate.Builder getUpdateFor(DecoratedKey key)
     {
         assert key != null;
-
-        PartitionUpdate previous = buffer.get(key);
+        PartitionUpdate.Builder previous = buffer.get(key);
         if (previous == null)
         {
-            previous = createPartitionUpdate(key);
-            currentSize += PartitionUpdate.serializer.serializedSize(previous, formatType.info.getLatestVersion().correspondingMessagingVersion());
-            previous.allowNewUpdates();
+            // todo: inefficient - we create and serialize a PU just to get its size, then recreate it
+            // todo: either allow PartitionUpdateBuilder to have .build() called several times or pre-calculate the size
+            currentSize += PartitionUpdate.serializer.serializedSize(createPartitionUpdateBuilder(key).build(), formatType.info.getLatestVersion().correspondingMessagingVersion());
+            previous = createPartitionUpdateBuilder(key);
             buffer.put(key, previous);
         }
         return previous;
@@ -108,9 +108,9 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         }
     }
 
-    private PartitionUpdate createPartitionUpdate(DecoratedKey key)
+    private PartitionUpdate.Builder createPartitionUpdateBuilder(DecoratedKey key)
     {
-        return new PartitionUpdate(metadata.get(), key, columns, 4)
+        return new PartitionUpdate.Builder(metadata.get(), key, columns, 4)
         {
             @Override
             public void add(Row row)
@@ -188,7 +188,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
     }
 
     //// typedef
-    static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate> {}
+    static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate.Builder> {}
 
     private class DiskWriter extends FastThreadLocalThread
     {
@@ -206,8 +206,8 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
 
                         try (SSTableTxnWriter writer = createWriter())
                     {
-                        for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet())
-                            writer.append(entry.getValue().unfilteredIterator());
+                        for (Map.Entry<DecoratedKey, PartitionUpdate.Builder> entry : b.entrySet())
+                            writer.append(entry.getValue().build().unfilteredIterator());
                         writer.finish(false);
                     }
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org