You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/02/21 11:04:01 UTC
[1/2] git commit: Optimize single partition batch statements
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 13b753b01 -> 3b4084b6c
Optimize single partition batch statements
patch by slebresne; reviewed by benedict for CASSANDRA-6737
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/54a7e003
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/54a7e003
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/54a7e003
Branch: refs/heads/cassandra-2.1
Commit: 54a7e0034148f451ff493f9f5363c26f10a21f20
Parents: edf16c9
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Feb 19 19:10:09 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Feb 21 10:18:02 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/BatchStatement.java | 57 ++++++++++++++------
.../cql3/statements/DeleteStatement.java | 8 ---
.../cql3/statements/ModificationStatement.java | 40 +++++++-------
.../cql3/statements/UpdateStatement.java | 8 ---
.../org/apache/cassandra/db/RowMutation.java | 7 ++-
6 files changed, 67 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbacc4d..a5e1016 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
* Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
* Fix replaying pre-2.0 commit logs (CASSANDRA-6714)
* Add static columns to CQL3 (CASSANDRA-6561)
+ * Optimize single partition batch statements (CASSANDRA-6737)
Merged from 1.2:
* Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
* Fix broken streams when replacing with same IP (CASSANDRA-6622)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index d4acbae..b1dbb31 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -30,7 +30,6 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.utils.Pair;
/**
* A <code>BATCH</code> statement parsed from a CQL query.
@@ -113,14 +112,26 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
throws RequestExecutionException, RequestValidationException
{
- Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
+ Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
for (int i = 0; i < statements.size(); i++)
{
ModificationStatement statement = statements.get(i);
List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
addStatementMutations(statement, statementVariables, local, cl, now, mutations);
}
- return mutations.values();
+ return unzipMutations(mutations);
+ }
+
+ private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations)
+ {
+ // The case where all statement where on the same keyspace is pretty common
+ if (mutations.size() == 1)
+ return mutations.values().iterator().next().values();
+
+ List<IMutation> ms = new ArrayList<>();
+ for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
+ ms.addAll(ksMap.values());
+ return ms;
}
private void addStatementMutations(ModificationStatement statement,
@@ -128,23 +139,40 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
boolean local,
ConsistencyLevel cl,
long now,
- Map<Pair<String, ByteBuffer>, IMutation> mutations)
+ Map<String, Map<ByteBuffer, IMutation>> mutations)
throws RequestExecutionException, RequestValidationException
{
- // Group mutation together, otherwise they won't get applied atomically
- for (IMutation m : statement.getMutations(variables, local, cl, attrs.getTimestamp(now, variables), true))
+ String ksName = statement.keyspace();
+ Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
+ if (ksMap == null)
{
- Pair<String, ByteBuffer> key = Pair.create(m.getKeyspaceName(), m.key());
- IMutation existing = mutations.get(key);
+ ksMap = new HashMap<>();
+ mutations.put(ksName, ksMap);
+ }
- if (existing == null)
+ // The following does the same than statement.getMutations(), but we inline it here because
+ // we don't want to recreate mutations every time as this is particularly inefficient when applying
+ // multiple batch to the same partition (see #6737).
+ List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables);
+ ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(variables);
+ UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+
+ for (ByteBuffer key : keys)
+ {
+ IMutation mutation = ksMap.get(key);
+ RowMutation rm;
+ if (mutation == null)
{
- mutations.put(key, m);
+ rm = new RowMutation(ksName, key);
+ mutation = type == Type.COUNTER ? new CounterMutation(rm, cl) : rm;
+ ksMap.put(key, mutation);
}
else
{
- existing.addAll(m);
+ rm = type == Type.COUNTER ? ((CounterMutation)mutation).rowMutation() : (RowMutation)mutation;
}
+
+ statement.addUpdateForKey(rm.addOrGet(statement.cfm, UnsortedColumns.factory), key, clusteringPrefix, params);
}
}
@@ -213,9 +241,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
}
+ ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
if (statement.hasConditions())
{
- ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
// As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
if (statement.hasIfNotExistCondition())
@@ -225,9 +253,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- // getPartitionKey will already have thrown if there is more than one key involved
- IMutation mut = statement.getMutations(statementVariables, false, cl, timestamp, true).iterator().next();
- updates.resolve(mut.getColumnFamilies().iterator().next());
+ UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementVariables, false, cl, now);
+ statement.addUpdateForKey(updates, key, clusteringPrefix, params);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index cd5f2a2..6efe100 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -41,14 +41,6 @@ public class DeleteStatement extends ModificationStatement
return false;
}
- public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
- throws InvalidRequestException
- {
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
- addUpdateForKey(cf, key, builder, params);
- return cf;
- }
-
public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
throws InvalidRequestException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index ac8d2e1..ecefcb9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -415,7 +415,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return null;
}
- protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
+ protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
// Lists SET operation incurs a read.
@@ -433,7 +433,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
}
- private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
+ private Map<ByteBuffer, ColumnGroupMap> readRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
try
@@ -516,7 +516,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else
cl.validateForWrite(cfm.ksName);
- Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp(), false);
+ Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp());
if (!mutations.isEmpty())
StorageProxy.mutateWithTriggers(mutations, cl, false);
@@ -651,7 +651,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (hasConditions())
throw new UnsupportedOperationException();
- for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false))
+ for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
mutation.apply();
return null;
}
@@ -667,15 +667,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch)
+ private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
throws RequestExecutionException, RequestValidationException
{
List<ByteBuffer> keys = buildPartitionKeyNames(variables);
ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
- // Some lists operation requires reading
- Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
- UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+ UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
Collection<IMutation> mutations = new ArrayList<IMutation>();
for (ByteBuffer key: keys)
@@ -683,25 +681,23 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
ThriftValidation.validateKey(cfm, key);
ColumnFamily cf = UnsortedColumns.factory.create(cfm);
addUpdateForKey(cf, key, clusteringPrefix, params);
- mutations.add(makeMutation(key, cf, cl, isBatch));
+ RowMutation rm = new RowMutation(cfm.ksName, key, cf);
+ mutations.add(isCounter() ? new CounterMutation(rm, cl) : rm);
}
return mutations;
}
- private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch)
+ public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
+ ColumnNameBuilder prefix,
+ List<ByteBuffer> variables,
+ boolean local,
+ ConsistencyLevel cl,
+ long now)
+ throws RequestExecutionException, RequestValidationException
{
- RowMutation rm;
- if (isBatch)
- {
- // we might group other mutations together with this one later, so make it mutable
- rm = new RowMutation(cfm.ksName, key);
- rm.add(cf);
- }
- else
- {
- rm = new RowMutation(cfm.ksName, key, cf);
- }
- return isCounter() ? new CounterMutation(rm, cl) : rm;
+ // Some lists operation requires reading
+ Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, prefix, local, cl);
+ return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
}
public static abstract class Parsed extends CFStatement
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 0e6481b..dcf22ef 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -98,14 +98,6 @@ public class UpdateStatement extends ModificationStatement
}
}
- public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
- throws InvalidRequestException
- {
- ColumnFamily cf = UnsortedColumns.factory.create(cfm);
- addUpdateForKey(cf, key, builder, params);
- return cf;
- }
-
public static class ParsedInsert extends ModificationStatement.Parsed
{
private final List<ColumnIdentifier> columnNames;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index e9d177b..49ee2c5 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -125,10 +125,15 @@ public class RowMutation implements IMutation
public ColumnFamily addOrGet(CFMetaData cfm)
{
+ return addOrGet(cfm, TreeMapBackedSortedColumns.factory);
+ }
+
+ public ColumnFamily addOrGet(CFMetaData cfm, ColumnFamily.Factory factory)
+ {
ColumnFamily cf = modifications.get(cfm.cfId);
if (cf == null)
{
- cf = TreeMapBackedSortedColumns.factory.create(cfm);
+ cf = factory.create(cfm);
modifications.put(cfm.cfId, cf);
}
return cf;
[2/2] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by sl...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
src/java/org/apache/cassandra/db/Mutation.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b4084b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b4084b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b4084b6
Branch: refs/heads/cassandra-2.1
Commit: 3b4084b6c9d7330889de23ee27c3483777054e55
Parents: 13b753b 54a7e00
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Feb 21 11:03:52 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Feb 21 11:03:52 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/BatchStatement.java | 57 ++++++++++++++------
.../cql3/statements/ModificationStatement.java | 40 +++++++-------
.../cql3/statements/UpdateStatement.java | 8 ---
.../apache/cassandra/db/CounterMutation.java | 5 ++
5 files changed, 66 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index ab2b4bc,b1dbb31..21e60f8
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@@ -130,23 -139,40 +141,40 @@@ public class BatchStatement implements
boolean local,
ConsistencyLevel cl,
long now,
- Map<Pair<String, ByteBuffer>, IMutation> mutations)
+ Map<String, Map<ByteBuffer, IMutation>> mutations)
throws RequestExecutionException, RequestValidationException
{
- // Group mutation together, otherwise they won't get applied atomically
- for (IMutation m : statement.getMutations(variables, local, cl, attrs.getTimestamp(now, variables), true))
+ String ksName = statement.keyspace();
+ Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
+ if (ksMap == null)
{
- Pair<String, ByteBuffer> key = Pair.create(m.getKeyspaceName(), m.key());
- IMutation existing = mutations.get(key);
+ ksMap = new HashMap<>();
+ mutations.put(ksName, ksMap);
+ }
+
+ // The following does the same than statement.getMutations(), but we inline it here because
+ // we don't want to recreate mutations every time as this is particularly inefficient when applying
+ // multiple batch to the same partition (see #6737).
+ List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables);
- ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(variables);
++ Composite clusteringPrefix = statement.createClusteringPrefix(variables);
+ UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
- if (existing == null)
+ for (ByteBuffer key : keys)
+ {
+ IMutation mutation = ksMap.get(key);
- RowMutation rm;
++ Mutation mut;
+ if (mutation == null)
{
- mutations.put(key, m);
- rm = new RowMutation(ksName, key);
- mutation = type == Type.COUNTER ? new CounterMutation(rm, cl) : rm;
++ mut = new Mutation(ksName, key);
++ mutation = type == Type.COUNTER ? new CounterMutation(mut, cl) : mut;
+ ksMap.put(key, mutation);
}
else
{
- existing.addAll(m);
- rm = type == Type.COUNTER ? ((CounterMutation)mutation).rowMutation() : (RowMutation)mutation;
++ mut = type == Type.COUNTER ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation;
}
+
- statement.addUpdateForKey(rm.addOrGet(statement.cfm, UnsortedColumns.factory), key, clusteringPrefix, params);
++ statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params);
}
}
@@@ -215,9 -241,9 +243,9 @@@
throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
}
- ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
++ Composite clusteringPrefix = statement.createClusteringPrefix(statementVariables);
if (statement.hasConditions())
{
- Composite clusteringPrefix = statement.createClusteringPrefix(statementVariables);
statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
// As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
if (statement.hasIfNotExistCondition())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index db0b7a9,ecefcb9..f90293b
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -380,7 -415,7 +380,7 @@@ public abstract class ModificationState
return null;
}
- protected Map<ByteBuffer, CQL3Row> readRequiredRows(List<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
- protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
++ protected Map<ByteBuffer, CQL3Row> readRequiredRows(Collection<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
// Lists SET operation incurs a read.
@@@ -394,10 -430,10 +394,10 @@@
}
}
- return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
+ return requiresRead ? readRows(partitionKeys, clusteringPrefix, cfm, local, cl) : null;
}
- protected Map<ByteBuffer, CQL3Row> readRows(List<ByteBuffer> partitionKeys, Composite rowPrefix, CFMetaData cfm, boolean local, ConsistencyLevel cl)
- private Map<ByteBuffer, ColumnGroupMap> readRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
++ protected Map<ByteBuffer, CQL3Row> readRows(Collection<ByteBuffer> partitionKeys, Composite rowPrefix, CFMetaData cfm, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
try
@@@ -604,12 -651,8 +604,12 @@@
if (hasConditions())
throw new UnsupportedOperationException();
- for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false))
+ for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
- mutation.apply();
+ {
+ // We don't use counters internally.
+ assert mutation instanceof Mutation;
+ ((Mutation) mutation).apply();
+ }
return null;
}
@@@ -628,37 -671,33 +628,33 @@@
throws RequestExecutionException, RequestValidationException
{
List<ByteBuffer> keys = buildPartitionKeyNames(variables);
- ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
+ Composite clusteringPrefix = createClusteringPrefix(variables);
- // Some lists operation requires reading
- Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
- UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+ UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
Collection<IMutation> mutations = new ArrayList<IMutation>();
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(cfm, key);
- ColumnFamily cf = UnsortedColumns.factory.create(cfm);
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
addUpdateForKey(cf, key, clusteringPrefix, params);
- mutations.add(makeMutation(key, cf, cl, isBatch));
- RowMutation rm = new RowMutation(cfm.ksName, key, cf);
- mutations.add(isCounter() ? new CounterMutation(rm, cl) : rm);
++ Mutation mut = new Mutation(cfm.ksName, key, cf);
++ mutations.add(isCounter() ? new CounterMutation(mut, cl) : mut);
}
return mutations;
}
- private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch)
+ public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
- ColumnNameBuilder prefix,
++ Composite prefix,
+ List<ByteBuffer> variables,
+ boolean local,
+ ConsistencyLevel cl,
+ long now)
+ throws RequestExecutionException, RequestValidationException
{
- Mutation mutation;
- if (isBatch)
- {
- // we might group other mutations together with this one later, so make it mutable
- mutation = new Mutation(cfm.ksName, key);
- mutation.add(cf);
- }
- else
- {
- mutation = new Mutation(cfm.ksName, key, cf);
- }
- return isCounter() ? new CounterMutation(mutation, cl) : mutation;
+ // Some lists operation requires reading
- Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, prefix, local, cl);
++ Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, cl);
+ return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
}
public static abstract class Parsed extends CFStatement
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/CounterMutation.java
index 5d96c70,fb363c2..41187ac
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@@ -65,12 -63,17 +65,17 @@@ public class CounterMutation implement
public Collection<ColumnFamily> getColumnFamilies()
{
- return rowMutation.getColumnFamilies();
+ return mutation.getColumnFamilies();
}
- public ByteBuffer key()
++ public Mutation getMutation()
+ {
- return rowMutation.key();
++ return mutation;
+ }
+
- public RowMutation rowMutation()
+ public ByteBuffer key()
{
- return rowMutation;
+ return mutation.key();
}
public ConsistencyLevel consistency()