You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/05/15 14:54:41 UTC

cassandra git commit: Warn on unlogged batch misuse

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 f5f591237 -> ebd05ddbe


Warn on unlogged batch misuse

Patch by tjake; reviewed by jbellis for CASSANDRA-9282


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ebd05ddb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ebd05ddb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ebd05ddb

Branch: refs/heads/cassandra-2.1
Commit: ebd05ddbe1fca8c70e9790628c0cce47327e4708
Parents: f5f5912
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon May 4 14:12:53 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri May 15 08:50:21 2015 -0400

----------------------------------------------------------------------
 .../cql3/statements/BatchStatement.java         | 40 +++++++++++++++++---
 1 file changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebd05ddb/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index c93bf64..6d4d3a1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -19,10 +19,13 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Function;
 import com.google.common.collect.*;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,10 +38,10 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.NoSpamLogger;
 
 /**
  * A <code>BATCH</code> statement parsed from a CQL query.
- *
  */
 public class BatchStatement implements CQLStatement
 {
@@ -53,14 +56,15 @@ public class BatchStatement implements CQLStatement
     private final Attributes attrs;
     private final boolean hasConditions;
     private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
+    private static final String unloggedBatchWarning = "Unlogged batch covering {} partition{} detected against table{} {}. You should use a logged batch for atomicity, or asynchronous writes for performance.";
 
     /**
      * Creates a new BatchStatement from a list of statements and a
      * Thrift consistency level.
      *
-     * @param type type of the batch
+     * @param type       type of the batch
      * @param statements a list of UpdateStatements
-     * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
+     * @param attrs      additional attributes for statement (CL, timestamp, timeToLive)
      */
     public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs)
     {
@@ -170,13 +174,16 @@ public class BatchStatement implements CQLStatement
 
     private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations)
     {
+
         // The case where all statement where on the same keyspace is pretty common
         if (mutations.size() == 1)
             return mutations.values().iterator().next().values();
 
+
         List<IMutation> ms = new ArrayList<>();
         for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
             ms.addAll(ksMap.values());
+
         return ms;
     }
 
@@ -214,7 +221,7 @@ public class BatchStatement implements CQLStatement
             }
             else
             {
-                mut = statement.cfm.isCounter() ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation;
+                mut = statement.cfm.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation;
             }
 
             statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params);
@@ -223,6 +230,7 @@ public class BatchStatement implements CQLStatement
 
     /**
      * Checks batch size to ensure threshold is met. If not, a warning is logged.
+     *
      * @param cfs ColumnFamilies that will store the batch's mutations.
      */
     public static void verifyBatchSize(Iterable<ColumnFamily> cfs)
@@ -237,13 +245,33 @@ public class BatchStatement implements CQLStatement
         {
             Set<String> ksCfPairs = new HashSet<>();
             for (ColumnFamily cf : cfs)
-                ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName);
+                ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName));
 
             String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.";
             logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold);
         }
     }
 
+    private void verifyBatchType(Collection<? extends IMutation> mutations)
+    {
+        if (type != Type.LOGGED && mutations.size() > 1)
+        {
+            Set<String> ksCfPairs = new HashSet<>();
+            Set<ByteBuffer> keySet = new HashSet<>();
+
+            for (IMutation im : mutations)
+            {
+                keySet.add(im.key());
+                for (ColumnFamily cf : im.getColumnFamilies())
+                    ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName));
+            }
+
+            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, unloggedBatchWarning,
+                             keySet.size(), keySet.size() == 1 ? "" : "s",
+                             ksCfPairs.size() == 1 ? "" : "s", ksCfPairs);
+        }
+    }
+
     public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
         return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options));
@@ -279,7 +307,9 @@ public class BatchStatement implements CQLStatement
                 return im.getColumnFamilies();
             }
         }));
+
         verifyBatchSize(cfs);
+        verifyBatchType(mutations);
 
         boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);