You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/03/15 10:27:41 UTC
cassandra git commit: Don't warn on big batches if everything is in
the same partition
Repository: cassandra
Updated Branches:
refs/heads/trunk b12413d4e -> f8b3a1588
Don't warn on big batches if everything is in the same partition
patch by slebresne; reviewed by iamaleksey for CASSANDRA-10876
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8b3a158
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8b3a158
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8b3a158
Branch: refs/heads/trunk
Commit: f8b3a15881c411ff766425084776e2339fe6a17b
Parents: b12413d
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Feb 25 14:20:29 2016 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Mar 15 10:21:29 2016 +0100
----------------------------------------------------------------------
.../cql3/statements/BatchStatement.java | 62 +++++++++++---------
.../cql3/statements/CQL3CasRequest.java | 4 --
2 files changed, 33 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b3a158/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 9faf73c..058969b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -262,22 +262,32 @@ public class BatchStatement implements CQLStatement
*
* @param updates - the batch mutations.
*/
- public static void verifyBatchSize(Iterable<PartitionUpdate> updates) throws InvalidRequestException
+ private static void verifyBatchSize(Collection<? extends IMutation> mutations) throws InvalidRequestException
{
+ // We only warn for batch spanning multiple mutations (#10876)
+ if (mutations.size() <= 1)
+ return;
+
long size = 0;
long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
long failThreshold = DatabaseDescriptor.getBatchSizeFailThreshold();
- for (PartitionUpdate update : updates)
- size += update.dataSize();
+ for (IMutation mutation : mutations)
+ {
+ for (PartitionUpdate update : mutation.getPartitionUpdates())
+ size += update.dataSize();
+ }
if (size > warnThreshold)
{
Set<String> tableNames = new HashSet<>();
- for (PartitionUpdate update : updates)
- tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
+ for (IMutation mutation : mutations)
+ {
+ for (PartitionUpdate update : mutation.getPartitionUpdates())
+ tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
+ }
- String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.{}";
+ String format = "Batch for {} is of size {}, exceeding specified threshold of {} by {}.{}";
if (size > failThreshold)
{
Tracing.trace(format, tableNames, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)");
@@ -292,29 +302,31 @@ public class BatchStatement implements CQLStatement
}
}
- private void verifyBatchType(Iterable<PartitionUpdate> updates)
+ private void verifyBatchType(Collection<? extends IMutation> mutations)
{
- if (!isLogged() && Iterables.size(updates) > 1)
+ if (!isLogged() && mutations.size() > 1)
{
Set<DecoratedKey> keySet = new HashSet<>();
Set<String> tableNames = new HashSet<>();
Map<String, Collection<Range<Token>>> localTokensByKs = new HashMap<>();
boolean localPartitionsOnly = true;
- for (PartitionUpdate update : updates)
+ for (IMutation mutation : mutations)
{
- keySet.add(update.partitionKey());
- tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
+ for (PartitionUpdate update : mutation.getPartitionUpdates())
+ {
+ keySet.add(update.partitionKey());
+ tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
+ }
if (localPartitionsOnly)
- localPartitionsOnly &= isPartitionLocal(localTokensByKs, update);
+ localPartitionsOnly &= isPartitionLocal(localTokensByKs, mutation);
}
// CASSANDRA-9303: If we only have local mutations we do not warn
if (localPartitionsOnly)
return;
-
NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, UNLOGGED_BATCH_WARNING,
keySet.size(), keySet.size() == 1 ? "" : "s",
tableNames.size() == 1 ? "" : "s", tableNames);
@@ -324,16 +336,17 @@ public class BatchStatement implements CQLStatement
}
}
- private boolean isPartitionLocal(Map<String, Collection<Range<Token>>> localTokensByKs, PartitionUpdate update)
+ private boolean isPartitionLocal(Map<String, Collection<Range<Token>>> localTokensByKs, IMutation mutation)
{
- Collection<Range<Token>> localRanges = localTokensByKs.get(update.metadata().ksName);
+ String ksName = mutation.getKeyspaceName();
+ Collection<Range<Token>> localRanges = localTokensByKs.get(ksName);
if (localRanges == null)
{
- localRanges = StorageService.instance.getLocalRanges(update.metadata().ksName);
- localTokensByKs.put(update.metadata().ksName, localRanges);
+ localRanges = StorageService.instance.getLocalRanges(ksName);
+ localTokensByKs.put(ksName, localRanges);
}
- return Range.isInRanges(update.partitionKey().getToken(), localRanges);
+ return Range.isInRanges(mutation.key().getToken(), localRanges);
}
public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
@@ -366,17 +379,8 @@ public class BatchStatement implements CQLStatement
if (mutations.isEmpty())
return;
- // Extract each collection of updates from it's IMutation and then lazily concatenate all of them into a single Iterable.
- Iterable<PartitionUpdate> updates = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<PartitionUpdate>>()
- {
- public Collection<PartitionUpdate> apply(IMutation im)
- {
- return im.getPartitionUpdates();
- }
- }));
-
- verifyBatchSize(updates);
- verifyBatchType(updates);
+ verifyBatchSize(mutations);
+ verifyBatchType(mutations);
boolean mutateAtomic = (isLogged() && mutations.size() > 1);
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b3a158/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 9564005..93844b3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -175,10 +175,6 @@ public class CQL3CasRequest implements CASRequest
upd.applyUpdates(current, update);
Keyspace.openAndGetStore(cfm).indexManager.validate(update);
-
- if (isBatch)
- BatchStatement.verifyBatchSize(Collections.singleton(update));
-
return update;
}