You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/05/15 14:54:41 UTC
cassandra git commit: Warn on unlogged batch misuse
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 f5f591237 -> ebd05ddbe
Warn on unlogged batch misuse
Patch by tjake; reviewed by jbellis for CASSANDRA-9282
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ebd05ddb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ebd05ddb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ebd05ddb
Branch: refs/heads/cassandra-2.1
Commit: ebd05ddbe1fca8c70e9790628c0cce47327e4708
Parents: f5f5912
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon May 4 14:12:53 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri May 15 08:50:21 2015 -0400
----------------------------------------------------------------------
.../cql3/statements/BatchStatement.java | 40 +++++++++++++++++---
1 file changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebd05ddb/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index c93bf64..6d4d3a1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -19,10 +19,13 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Function;
import com.google.common.collect.*;
+
import org.apache.cassandra.config.DatabaseDescriptor;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,10 +38,10 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.NoSpamLogger;
/**
* A <code>BATCH</code> statement parsed from a CQL query.
- *
*/
public class BatchStatement implements CQLStatement
{
@@ -53,14 +56,15 @@ public class BatchStatement implements CQLStatement
private final Attributes attrs;
private final boolean hasConditions;
private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
+ private static final String unloggedBatchWarning = "Unlogged batch covering {} partition{} detected against table{} {}. You should use a logged batch for atomicity, or asynchronous writes for performance.";
/**
* Creates a new BatchStatement from a list of statements and a
* Thrift consistency level.
*
- * @param type type of the batch
+ * @param type type of the batch
* @param statements a list of UpdateStatements
- * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
+ * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
*/
public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs)
{
@@ -170,13 +174,16 @@ public class BatchStatement implements CQLStatement
private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations)
{
+
// The case where all statement where on the same keyspace is pretty common
if (mutations.size() == 1)
return mutations.values().iterator().next().values();
+
List<IMutation> ms = new ArrayList<>();
for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
ms.addAll(ksMap.values());
+
return ms;
}
@@ -214,7 +221,7 @@ public class BatchStatement implements CQLStatement
}
else
{
- mut = statement.cfm.isCounter() ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation;
+ mut = statement.cfm.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation;
}
statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params);
@@ -223,6 +230,7 @@ public class BatchStatement implements CQLStatement
/**
* Checks batch size to ensure threshold is met. If not, a warning is logged.
+ *
* @param cfs ColumnFamilies that will store the batch's mutations.
*/
public static void verifyBatchSize(Iterable<ColumnFamily> cfs)
@@ -237,13 +245,33 @@ public class BatchStatement implements CQLStatement
{
Set<String> ksCfPairs = new HashSet<>();
for (ColumnFamily cf : cfs)
- ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName);
+ ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName));
String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.";
logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold);
}
}
+ private void verifyBatchType(Collection<? extends IMutation> mutations)
+ {
+ if (type != Type.LOGGED && mutations.size() > 1)
+ {
+ Set<String> ksCfPairs = new HashSet<>();
+ Set<ByteBuffer> keySet = new HashSet<>();
+
+ for (IMutation im : mutations)
+ {
+ keySet.add(im.key());
+ for (ColumnFamily cf : im.getColumnFamilies())
+ ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName));
+ }
+
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, unloggedBatchWarning,
+ keySet.size(), keySet.size() == 1 ? "" : "s",
+ ksCfPairs.size() == 1 ? "" : "s", ksCfPairs);
+ }
+ }
+
public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options));
@@ -279,7 +307,9 @@ public class BatchStatement implements CQLStatement
return im.getColumnFamilies();
}
}));
+
verifyBatchSize(cfs);
+ verifyBatchType(mutations);
boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);