You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/02/21 11:07:09 UTC

[1/3] git commit: Optimize single partition batch statements

Repository: cassandra
Updated Branches:
  refs/heads/trunk 63e0eb4fa -> 5d4ca6eb0


Optimize single partition batch statements

patch by slebresne; reviewed by benedict for CASSANDRA-6737


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/54a7e003
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/54a7e003
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/54a7e003

Branch: refs/heads/trunk
Commit: 54a7e0034148f451ff493f9f5363c26f10a21f20
Parents: edf16c9
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Feb 19 19:10:09 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Feb 21 10:18:02 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/BatchStatement.java         | 57 ++++++++++++++------
 .../cql3/statements/DeleteStatement.java        |  8 ---
 .../cql3/statements/ModificationStatement.java  | 40 +++++++-------
 .../cql3/statements/UpdateStatement.java        |  8 ---
 .../org/apache/cassandra/db/RowMutation.java    |  7 ++-
 6 files changed, 67 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbacc4d..a5e1016 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
  * Fix replaying pre-2.0 commit logs (CASSANDRA-6714)
  * Add static columns to CQL3 (CASSANDRA-6561)
+ * Optimize single partition batch statements (CASSANDRA-6737)
 Merged from 1.2:
  * Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
  * Fix broken streams when replacing with same IP (CASSANDRA-6622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index d4acbae..b1dbb31 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -30,7 +30,6 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * A <code>BATCH</code> statement parsed from a CQL query.
@@ -113,14 +112,26 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
-        Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
+        Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
             List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
             addStatementMutations(statement, statementVariables, local, cl, now, mutations);
         }
-        return mutations.values();
+        return unzipMutations(mutations);
+    }
+
+    private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations)
+    {
+        // The case where all statement where on the same keyspace is pretty common
+        if (mutations.size() == 1)
+            return mutations.values().iterator().next().values();
+
+        List<IMutation> ms = new ArrayList<>();
+        for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
+            ms.addAll(ksMap.values());
+        return ms;
     }
 
     private void addStatementMutations(ModificationStatement statement,
@@ -128,23 +139,40 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                                        boolean local,
                                        ConsistencyLevel cl,
                                        long now,
-                                       Map<Pair<String, ByteBuffer>, IMutation> mutations)
+                                       Map<String, Map<ByteBuffer, IMutation>> mutations)
     throws RequestExecutionException, RequestValidationException
     {
-        // Group mutation together, otherwise they won't get applied atomically
-        for (IMutation m : statement.getMutations(variables, local, cl, attrs.getTimestamp(now, variables), true))
+        String ksName = statement.keyspace();
+        Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
+        if (ksMap == null)
         {
-            Pair<String, ByteBuffer> key = Pair.create(m.getKeyspaceName(), m.key());
-            IMutation existing = mutations.get(key);
+            ksMap = new HashMap<>();
+            mutations.put(ksName, ksMap);
+        }
 
-            if (existing == null)
+        // The following does the same than statement.getMutations(), but we inline it here because
+        // we don't want to recreate mutations every time as this is particularly inefficient when applying
+        // multiple batch to the same partition (see #6737).
+        List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables);
+        ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(variables);
+        UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+
+        for (ByteBuffer key : keys)
+        {
+            IMutation mutation = ksMap.get(key);
+            RowMutation rm;
+            if (mutation == null)
             {
-                mutations.put(key, m);
+                rm = new RowMutation(ksName, key);
+                mutation = type == Type.COUNTER ? new CounterMutation(rm, cl) : rm;
+                ksMap.put(key, mutation);
             }
             else
             {
-                existing.addAll(m);
+                rm = type == Type.COUNTER ? ((CounterMutation)mutation).rowMutation() : (RowMutation)mutation;
             }
+
+            statement.addUpdateForKey(rm.addOrGet(statement.cfm, UnsortedColumns.factory), key, clusteringPrefix, params);
         }
     }
 
@@ -213,9 +241,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                 throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
             }
 
+            ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
             if (statement.hasConditions())
             {
-                ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
                 statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
                 // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
                 if (statement.hasIfNotExistCondition())
@@ -225,9 +253,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             }
             else
             {
-                // getPartitionKey will already have thrown if there is more than one key involved
-                IMutation mut = statement.getMutations(statementVariables, false, cl, timestamp, true).iterator().next();
-                updates.resolve(mut.getColumnFamilies().iterator().next());
+                UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementVariables, false, cl, now);
+                statement.addUpdateForKey(updates, key, clusteringPrefix, params);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index cd5f2a2..6efe100 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -41,14 +41,6 @@ public class DeleteStatement extends ModificationStatement
         return false;
     }
 
-    public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
-    throws InvalidRequestException
-    {
-        ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
-        addUpdateForKey(cf, key, builder, params);
-        return cf;
-    }
-
     public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
     throws InvalidRequestException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index ac8d2e1..ecefcb9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -415,7 +415,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return null;
     }
 
-    protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
+    protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         // Lists SET operation incurs a read.
@@ -433,7 +433,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
     }
 
-    private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
+    private Map<ByteBuffer, ColumnGroupMap> readRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         try
@@ -516,7 +516,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         else
             cl.validateForWrite(cfm.ksName);
 
-        Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp(), false);
+        Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp());
         if (!mutations.isEmpty())
             StorageProxy.mutateWithTriggers(mutations, cl, false);
 
@@ -651,7 +651,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         if (hasConditions())
             throw new UnsupportedOperationException();
 
-        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false))
+        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
             mutation.apply();
         return null;
     }
@@ -667,15 +667,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch)
+    private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(variables);
         ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
 
-        // Some lists operation requires reading
-        Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
-        UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+        UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
 
         Collection<IMutation> mutations = new ArrayList<IMutation>();
         for (ByteBuffer key: keys)
@@ -683,25 +681,23 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             ThriftValidation.validateKey(cfm, key);
             ColumnFamily cf = UnsortedColumns.factory.create(cfm);
             addUpdateForKey(cf, key, clusteringPrefix, params);
-            mutations.add(makeMutation(key, cf, cl, isBatch));
+            RowMutation rm = new RowMutation(cfm.ksName, key, cf);
+            mutations.add(isCounter() ? new CounterMutation(rm, cl) : rm);
         }
         return mutations;
     }
 
-    private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch)
+    public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
+                                                 ColumnNameBuilder prefix,
+                                                 List<ByteBuffer> variables,
+                                                 boolean local,
+                                                 ConsistencyLevel cl,
+                                                 long now)
+    throws RequestExecutionException, RequestValidationException
     {
-        RowMutation rm;
-        if (isBatch)
-        {
-            // we might group other mutations together with this one later, so make it mutable
-            rm = new RowMutation(cfm.ksName, key);
-            rm.add(cf);
-        }
-        else
-        {
-            rm = new RowMutation(cfm.ksName, key, cf);
-        }
-        return isCounter() ? new CounterMutation(rm, cl) : rm;
+        // Some lists operation requires reading
+        Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, prefix, local, cl);
+        return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
     }
 
     public static abstract class Parsed extends CFStatement

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 0e6481b..dcf22ef 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -98,14 +98,6 @@ public class UpdateStatement extends ModificationStatement
         }
     }
 
-    public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
-    throws InvalidRequestException
-    {
-        ColumnFamily cf = UnsortedColumns.factory.create(cfm);
-        addUpdateForKey(cf, key, builder, params);
-        return cf;
-    }
-
     public static class ParsedInsert extends ModificationStatement.Parsed
     {
         private final List<ColumnIdentifier> columnNames;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index e9d177b..49ee2c5 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -125,10 +125,15 @@ public class RowMutation implements IMutation
 
     public ColumnFamily addOrGet(CFMetaData cfm)
     {
+        return addOrGet(cfm, TreeMapBackedSortedColumns.factory);
+    }
+
+    public ColumnFamily addOrGet(CFMetaData cfm, ColumnFamily.Factory factory)
+    {
         ColumnFamily cf = modifications.get(cfm.cfId);
         if (cf == null)
         {
-            cf = TreeMapBackedSortedColumns.factory.create(cfm);
+            cf = factory.create(cfm);
             modifications.put(cfm.cfId, cf);
         }
         return cf;


[3/3] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by sl...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d4ca6eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d4ca6eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d4ca6eb

Branch: refs/heads/trunk
Commit: 5d4ca6eb0346c716d822a6983a28fc11939470a4
Parents: 63e0eb4 3b4084b
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Feb 21 11:07:01 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Feb 21 11:07:01 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/BatchStatement.java         | 57 ++++++++++++++------
 .../cql3/statements/ModificationStatement.java  | 40 +++++++-------
 .../cql3/statements/UpdateStatement.java        |  8 ---
 .../apache/cassandra/db/CounterMutation.java    |  5 ++
 5 files changed, 66 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d4ca6eb/CHANGES.txt
----------------------------------------------------------------------


[2/3] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by sl...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
	src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
	src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
	src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
	src/java/org/apache/cassandra/db/Mutation.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b4084b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b4084b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b4084b6

Branch: refs/heads/trunk
Commit: 3b4084b6c9d7330889de23ee27c3483777054e55
Parents: 13b753b 54a7e00
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Feb 21 11:03:52 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Feb 21 11:03:52 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/BatchStatement.java         | 57 ++++++++++++++------
 .../cql3/statements/ModificationStatement.java  | 40 +++++++-------
 .../cql3/statements/UpdateStatement.java        |  8 ---
 .../apache/cassandra/db/CounterMutation.java    |  5 ++
 5 files changed, 66 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index ab2b4bc,b1dbb31..21e60f8
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@@ -130,23 -139,40 +141,40 @@@ public class BatchStatement implements 
                                         boolean local,
                                         ConsistencyLevel cl,
                                         long now,
-                                        Map<Pair<String, ByteBuffer>, IMutation> mutations)
+                                        Map<String, Map<ByteBuffer, IMutation>> mutations)
      throws RequestExecutionException, RequestValidationException
      {
-         // Group mutation together, otherwise they won't get applied atomically
-         for (IMutation m : statement.getMutations(variables, local, cl, attrs.getTimestamp(now, variables), true))
+         String ksName = statement.keyspace();
+         Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
+         if (ksMap == null)
          {
-             Pair<String, ByteBuffer> key = Pair.create(m.getKeyspaceName(), m.key());
-             IMutation existing = mutations.get(key);
+             ksMap = new HashMap<>();
+             mutations.put(ksName, ksMap);
+         }
+ 
+         // The following does the same than statement.getMutations(), but we inline it here because
+         // we don't want to recreate mutations every time as this is particularly inefficient when applying
+         // multiple batch to the same partition (see #6737).
+         List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables);
 -        ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(variables);
++        Composite clusteringPrefix = statement.createClusteringPrefix(variables);
+         UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
  
-             if (existing == null)
+         for (ByteBuffer key : keys)
+         {
+             IMutation mutation = ksMap.get(key);
 -            RowMutation rm;
++            Mutation mut;
+             if (mutation == null)
              {
-                 mutations.put(key, m);
 -                rm = new RowMutation(ksName, key);
 -                mutation = type == Type.COUNTER ? new CounterMutation(rm, cl) : rm;
++                mut = new Mutation(ksName, key);
++                mutation = type == Type.COUNTER ? new CounterMutation(mut, cl) : mut;
+                 ksMap.put(key, mutation);
              }
              else
              {
-                 existing.addAll(m);
 -                rm = type == Type.COUNTER ? ((CounterMutation)mutation).rowMutation() : (RowMutation)mutation;
++                mut = type == Type.COUNTER ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation;
              }
+ 
 -            statement.addUpdateForKey(rm.addOrGet(statement.cfm, UnsortedColumns.factory), key, clusteringPrefix, params);
++            statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params);
          }
      }
  
@@@ -215,9 -241,9 +243,9 @@@
                  throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
              }
  
 -            ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
++            Composite clusteringPrefix = statement.createClusteringPrefix(statementVariables);
              if (statement.hasConditions())
              {
-                 Composite clusteringPrefix = statement.createClusteringPrefix(statementVariables);
                  statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
                  // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
                  if (statement.hasIfNotExistCondition())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index db0b7a9,ecefcb9..f90293b
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -380,7 -415,7 +380,7 @@@ public abstract class ModificationState
          return null;
      }
  
-     protected Map<ByteBuffer, CQL3Row> readRequiredRows(List<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
 -    protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
++    protected Map<ByteBuffer, CQL3Row> readRequiredRows(Collection<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
      throws RequestExecutionException, RequestValidationException
      {
          // Lists SET operation incurs a read.
@@@ -394,10 -430,10 +394,10 @@@
              }
          }
  
 -        return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
 +        return requiresRead ? readRows(partitionKeys, clusteringPrefix, cfm, local, cl) : null;
      }
  
-     protected Map<ByteBuffer, CQL3Row> readRows(List<ByteBuffer> partitionKeys, Composite rowPrefix, CFMetaData cfm, boolean local, ConsistencyLevel cl)
 -    private Map<ByteBuffer, ColumnGroupMap> readRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
++    protected Map<ByteBuffer, CQL3Row> readRows(Collection<ByteBuffer> partitionKeys, Composite rowPrefix, CFMetaData cfm, boolean local, ConsistencyLevel cl)
      throws RequestExecutionException, RequestValidationException
      {
          try
@@@ -604,12 -651,8 +604,12 @@@
          if (hasConditions())
              throw new UnsupportedOperationException();
  
-         for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false))
+         for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
 -            mutation.apply();
 +        {
 +            // We don't use counters internally.
 +            assert mutation instanceof Mutation;
 +            ((Mutation) mutation).apply();
 +        }
          return null;
      }
  
@@@ -628,37 -671,33 +628,33 @@@
      throws RequestExecutionException, RequestValidationException
      {
          List<ByteBuffer> keys = buildPartitionKeyNames(variables);
 -        ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
 +        Composite clusteringPrefix = createClusteringPrefix(variables);
  
-         // Some lists operation requires reading
-         Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
-         UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+         UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
  
          Collection<IMutation> mutations = new ArrayList<IMutation>();
          for (ByteBuffer key: keys)
          {
              ThriftValidation.validateKey(cfm, key);
 -            ColumnFamily cf = UnsortedColumns.factory.create(cfm);
 +            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
              addUpdateForKey(cf, key, clusteringPrefix, params);
-             mutations.add(makeMutation(key, cf, cl, isBatch));
 -            RowMutation rm = new RowMutation(cfm.ksName, key, cf);
 -            mutations.add(isCounter() ? new CounterMutation(rm, cl) : rm);
++            Mutation mut = new Mutation(cfm.ksName, key, cf);
++            mutations.add(isCounter() ? new CounterMutation(mut, cl) : mut);
          }
          return mutations;
      }
  
-     private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch)
+     public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
 -                                                 ColumnNameBuilder prefix,
++                                                 Composite prefix,
+                                                  List<ByteBuffer> variables,
+                                                  boolean local,
+                                                  ConsistencyLevel cl,
+                                                  long now)
+     throws RequestExecutionException, RequestValidationException
      {
-         Mutation mutation;
-         if (isBatch)
-         {
-             // we might group other mutations together with this one later, so make it mutable
-             mutation = new Mutation(cfm.ksName, key);
-             mutation.add(cf);
-         }
-         else
-         {
-             mutation = new Mutation(cfm.ksName, key, cf);
-         }
-         return isCounter() ? new CounterMutation(mutation, cl) : mutation;
+         // Some lists operation requires reading
 -        Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, prefix, local, cl);
++        Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, cl);
+         return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
      }
  
      public static abstract class Parsed extends CFStatement

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/CounterMutation.java
index 5d96c70,fb363c2..41187ac
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@@ -65,12 -63,17 +65,17 @@@ public class CounterMutation implement
  
      public Collection<ColumnFamily> getColumnFamilies()
      {
 -        return rowMutation.getColumnFamilies();
 +        return mutation.getColumnFamilies();
      }
  
 -    public ByteBuffer key()
++    public Mutation getMutation()
+     {
 -        return rowMutation.key();
++        return mutation;
+     }
+ 
 -    public RowMutation rowMutation()
 +    public ByteBuffer key()
      {
 -        return rowMutation;
 +        return mutation.key();
      }
  
      public ConsistencyLevel consistency()