You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/02/21 10:45:02 UTC
git commit: Optimize single partition batch statements
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 edf16c950 -> 54a7e0034
Optimize single partition batch statements
patch by slebresne; reviewed by benedict for CASSANDRA-6737
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/54a7e003
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/54a7e003
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/54a7e003
Branch: refs/heads/cassandra-2.0
Commit: 54a7e0034148f451ff493f9f5363c26f10a21f20
Parents: edf16c9
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Feb 19 19:10:09 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Feb 21 10:18:02 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/BatchStatement.java | 57 ++++++++++++++------
.../cql3/statements/DeleteStatement.java | 8 ---
.../cql3/statements/ModificationStatement.java | 40 +++++++-------
.../cql3/statements/UpdateStatement.java | 8 ---
.../org/apache/cassandra/db/RowMutation.java | 7 ++-
6 files changed, 67 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbacc4d..a5e1016 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
* Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
* Fix replaying pre-2.0 commit logs (CASSANDRA-6714)
* Add static columns to CQL3 (CASSANDRA-6561)
+ * Optimize single partition batch statements (CASSANDRA-6737)
Merged from 1.2:
* Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
* Fix broken streams when replacing with same IP (CASSANDRA-6622)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index d4acbae..b1dbb31 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -30,7 +30,6 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.utils.Pair;
/**
* A <code>BATCH</code> statement parsed from a CQL query.
@@ -113,14 +112,26 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
throws RequestExecutionException, RequestValidationException
{
- Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
+ Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
for (int i = 0; i < statements.size(); i++)
{
ModificationStatement statement = statements.get(i);
List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
addStatementMutations(statement, statementVariables, local, cl, now, mutations);
}
- return mutations.values();
+ return unzipMutations(mutations);
+ }
+
+ private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations)
+ {
+ // The case where all statement where on the same keyspace is pretty common
+ if (mutations.size() == 1)
+ return mutations.values().iterator().next().values();
+
+ List<IMutation> ms = new ArrayList<>();
+ for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
+ ms.addAll(ksMap.values());
+ return ms;
}
private void addStatementMutations(ModificationStatement statement,
@@ -128,23 +139,40 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
boolean local,
ConsistencyLevel cl,
long now,
- Map<Pair<String, ByteBuffer>, IMutation> mutations)
+ Map<String, Map<ByteBuffer, IMutation>> mutations)
throws RequestExecutionException, RequestValidationException
{
- // Group mutation together, otherwise they won't get applied atomically
- for (IMutation m : statement.getMutations(variables, local, cl, attrs.getTimestamp(now, variables), true))
+ String ksName = statement.keyspace();
+ Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
+ if (ksMap == null)
{
- Pair<String, ByteBuffer> key = Pair.create(m.getKeyspaceName(), m.key());
- IMutation existing = mutations.get(key);
+ ksMap = new HashMap<>();
+ mutations.put(ksName, ksMap);
+ }
- if (existing == null)
+ // The following does the same than statement.getMutations(), but we inline it here because
+ // we don't want to recreate mutations every time as this is particularly inefficient when applying
+ // multiple batch to the same partition (see #6737).
+ List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables);
+ ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(variables);
+ UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+
+ for (ByteBuffer key : keys)
+ {
+ IMutation mutation = ksMap.get(key);
+ RowMutation rm;
+ if (mutation == null)
{
- mutations.put(key, m);
+ rm = new RowMutation(ksName, key);
+ mutation = type == Type.COUNTER ? new CounterMutation(rm, cl) : rm;
+ ksMap.put(key, mutation);
}
else
{
- existing.addAll(m);
+ rm = type == Type.COUNTER ? ((CounterMutation)mutation).rowMutation() : (RowMutation)mutation;
}
+
+ statement.addUpdateForKey(rm.addOrGet(statement.cfm, UnsortedColumns.factory), key, clusteringPrefix, params);
}
}
@@ -213,9 +241,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
}
+ ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
if (statement.hasConditions())
{
- ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
// As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
if (statement.hasIfNotExistCondition())
@@ -225,9 +253,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- // getPartitionKey will already have thrown if there is more than one key involved
- IMutation mut = statement.getMutations(statementVariables, false, cl, timestamp, true).iterator().next();
- updates.resolve(mut.getColumnFamilies().iterator().next());
+ UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementVariables, false, cl, now);
+ statement.addUpdateForKey(updates, key, clusteringPrefix, params);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index cd5f2a2..6efe100 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -41,14 +41,6 @@ public class DeleteStatement extends ModificationStatement
return false;
}
- public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
- throws InvalidRequestException
- {
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
- addUpdateForKey(cf, key, builder, params);
- return cf;
- }
-
public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
throws InvalidRequestException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index ac8d2e1..ecefcb9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -415,7 +415,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return null;
}
- protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
+ protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
// Lists SET operation incurs a read.
@@ -433,7 +433,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
}
- private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
+ private Map<ByteBuffer, ColumnGroupMap> readRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
try
@@ -516,7 +516,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else
cl.validateForWrite(cfm.ksName);
- Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp(), false);
+ Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp());
if (!mutations.isEmpty())
StorageProxy.mutateWithTriggers(mutations, cl, false);
@@ -651,7 +651,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (hasConditions())
throw new UnsupportedOperationException();
- for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false))
+ for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
mutation.apply();
return null;
}
@@ -667,15 +667,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch)
+ private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
throws RequestExecutionException, RequestValidationException
{
List<ByteBuffer> keys = buildPartitionKeyNames(variables);
ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
- // Some lists operation requires reading
- Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
- UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+ UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
Collection<IMutation> mutations = new ArrayList<IMutation>();
for (ByteBuffer key: keys)
@@ -683,25 +681,23 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
ThriftValidation.validateKey(cfm, key);
ColumnFamily cf = UnsortedColumns.factory.create(cfm);
addUpdateForKey(cf, key, clusteringPrefix, params);
- mutations.add(makeMutation(key, cf, cl, isBatch));
+ RowMutation rm = new RowMutation(cfm.ksName, key, cf);
+ mutations.add(isCounter() ? new CounterMutation(rm, cl) : rm);
}
return mutations;
}
- private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch)
+ public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
+ ColumnNameBuilder prefix,
+ List<ByteBuffer> variables,
+ boolean local,
+ ConsistencyLevel cl,
+ long now)
+ throws RequestExecutionException, RequestValidationException
{
- RowMutation rm;
- if (isBatch)
- {
- // we might group other mutations together with this one later, so make it mutable
- rm = new RowMutation(cfm.ksName, key);
- rm.add(cf);
- }
- else
- {
- rm = new RowMutation(cfm.ksName, key, cf);
- }
- return isCounter() ? new CounterMutation(rm, cl) : rm;
+ // Some lists operation requires reading
+ Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, prefix, local, cl);
+ return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
}
public static abstract class Parsed extends CFStatement
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 0e6481b..dcf22ef 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -98,14 +98,6 @@ public class UpdateStatement extends ModificationStatement
}
}
- public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
- throws InvalidRequestException
- {
- ColumnFamily cf = UnsortedColumns.factory.create(cfm);
- addUpdateForKey(cf, key, builder, params);
- return cf;
- }
-
public static class ParsedInsert extends ModificationStatement.Parsed
{
private final List<ColumnIdentifier> columnNames;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index e9d177b..49ee2c5 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -125,10 +125,15 @@ public class RowMutation implements IMutation
public ColumnFamily addOrGet(CFMetaData cfm)
{
+ return addOrGet(cfm, TreeMapBackedSortedColumns.factory);
+ }
+
+ public ColumnFamily addOrGet(CFMetaData cfm, ColumnFamily.Factory factory)
+ {
ColumnFamily cf = modifications.get(cfm.cfId);
if (cf == null)
{
- cf = TreeMapBackedSortedColumns.factory.create(cfm);
+ cf = factory.create(cfm);
modifications.put(cfm.cfId, cf);
}
return cf;