You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/04/22 14:41:49 UTC

[1/6] git commit: Log a warning for large batches patch by Lyuben Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 364282a7d -> de720b4aa
  refs/heads/cassandra-2.1 a2e74354c -> 510c82e4e
  refs/heads/trunk 498eb2ab7 -> 8e943b3da


Log a warning for large batches
patch by Lyuben Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de720b4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de720b4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de720b4a

Branch: refs/heads/cassandra-2.0
Commit: de720b4aa31198076abbd76a53644df341577126
Parents: 364282a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 22 07:40:32 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 22 07:40:32 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 +++
 .../apache/cassandra/cql3/QueryProcessor.java   |  2 +-
 .../cql3/statements/BatchStatement.java         | 42 +++++++++++++++++++-
 6 files changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 791586c..fffb2a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.8
+ * Log a warning for large batches (CASSANDRA-6487)
  * Queries on compact tables can return more rows that requested (CASSANDRA-7052)
  * USING TIMESTAMP for batches does not work (CASSANDRA-7053)
 Merged from 1.2:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2edd498..2de6753 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -429,6 +429,11 @@ tombstone_failure_threshold: 100000
 # that wastefully either.
 column_index_size_in_kb: 64
 
+
+# Log WARN on any batch size exceeding this value. 5kb per batch by default.
+# Caution should be taken on increasing the size of this threshold as it can lead to node instability.
+batch_size_warn_threshold_in_kb: 5
+
 # Size limit for rows being compacted in memory.  Larger rows will spill
 # over to disk and use a slower two-pass compaction process.  A message
 # will be logged specifying the row key.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5317fb8..7a3185a 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -120,6 +120,7 @@ public class Config
 
     /* if the size of columns or super-columns are more than this, indexing will kick in */
     public Integer column_index_size_in_kb = 64;
+    public Integer batch_size_warn_threshold_in_kb = 5;
     public Integer in_memory_compaction_limit_in_mb = 64;
     public Integer concurrent_compactors = FBUtilities.getAvailableProcessors();
     public volatile Integer compaction_throughput_mb_per_sec = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9e06601..6417524 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -658,6 +658,11 @@ public class DatabaseDescriptor
         return conf.column_index_size_in_kb * 1024;
     }
 
+    public static int getBatchSizeWarnThreshold()
+    {
+        return conf.batch_size_warn_threshold_in_kb * 1024;
+    }
+
     public static Collection<String> getInitialTokens()
     {
         return tokensFromString(System.getProperty("cassandra.initial_token", conf.initial_token));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ab0ea40..15ee59f 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -30,13 +30,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.statements.*;
-import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.thrift.ThriftClientState;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.SemanticVersion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 7f26341..8e61ae5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.collect.Iterables;
+import com.google.common.base.Function;
+import com.google.common.collect.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.github.jamm.MemoryMeter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
@@ -47,6 +51,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     private final List<ModificationStatement> statements;
     private final Attributes attrs;
     private final boolean hasConditions;
+    private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
 
     /**
      * Creates a new BatchStatement from a list of statements and a
@@ -177,6 +182,29 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         }
     }
 
+    /**
+     * Checks batch size to ensure threshold is met. If not, a warning is logged.
+     * @param cfs ColumnFamilies that will store the batch's mutations.
+     */
+    private void verifyBatchSize(Iterable<ColumnFamily> cfs)
+    {
+        long size = 0;
+        long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
+
+        for (ColumnFamily cf : cfs)
+            size += cf.dataSize();
+
+        if (size > warnThreshold)
+        {
+            Set<String> ksCfPairs = new HashSet<>();
+            for (ColumnFamily cf : cfs)
+                ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName);
+
+            String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.";
+            logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold);
+        }
+    }
+
     public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
         if (options.getConsistency() == null)
@@ -207,10 +235,21 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
 
     private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
     {
+        // Extract each collection of cfs from it's IMutation and then lazily concatenate all of them into a single Iterable.
+        Iterable<ColumnFamily> cfs = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<ColumnFamily>>()
+        {
+            public Collection<ColumnFamily> apply(IMutation im)
+            {
+                return im.getColumnFamilies();
+            }
+        }));
+        verifyBatchSize(cfs);
+
         boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
     }
 
+
     private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
     throws RequestExecutionException, RequestValidationException
     {
@@ -259,6 +298,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             }
         }
 
+        verifyBatchSize(Collections.singleton(updates));
         ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
         return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
     }


[6/6] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by jb...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8e943b3d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8e943b3d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8e943b3d

Branch: refs/heads/trunk
Commit: 8e943b3dac644e6ca6a8848284e126164054634c
Parents: 498eb2a 510c82e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 22 07:41:38 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 22 07:41:38 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 +++
 .../apache/cassandra/cql3/QueryProcessor.java   |  2 +-
 .../cql3/statements/BatchStatement.java         | 42 +++++++++++++++++++-
 6 files changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e943b3d/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e943b3d/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------


[3/6] git commit: Log a warning for large batches patch by Lyuben Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487

Posted by jb...@apache.org.
Log a warning for large batches
patch by Lyuben Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de720b4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de720b4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de720b4a

Branch: refs/heads/trunk
Commit: de720b4aa31198076abbd76a53644df341577126
Parents: 364282a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 22 07:40:32 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 22 07:40:32 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 +++
 .../apache/cassandra/cql3/QueryProcessor.java   |  2 +-
 .../cql3/statements/BatchStatement.java         | 42 +++++++++++++++++++-
 6 files changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 791586c..fffb2a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.8
+ * Log a warning for large batches (CASSANDRA-6487)
  * Queries on compact tables can return more rows that requested (CASSANDRA-7052)
  * USING TIMESTAMP for batches does not work (CASSANDRA-7053)
 Merged from 1.2:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2edd498..2de6753 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -429,6 +429,11 @@ tombstone_failure_threshold: 100000
 # that wastefully either.
 column_index_size_in_kb: 64
 
+
+# Log WARN on any batch size exceeding this value. 5kb per batch by default.
+# Caution should be taken on increasing the size of this threshold as it can lead to node instability.
+batch_size_warn_threshold_in_kb: 5
+
 # Size limit for rows being compacted in memory.  Larger rows will spill
 # over to disk and use a slower two-pass compaction process.  A message
 # will be logged specifying the row key.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5317fb8..7a3185a 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -120,6 +120,7 @@ public class Config
 
     /* if the size of columns or super-columns are more than this, indexing will kick in */
     public Integer column_index_size_in_kb = 64;
+    public Integer batch_size_warn_threshold_in_kb = 5;
     public Integer in_memory_compaction_limit_in_mb = 64;
     public Integer concurrent_compactors = FBUtilities.getAvailableProcessors();
     public volatile Integer compaction_throughput_mb_per_sec = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9e06601..6417524 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -658,6 +658,11 @@ public class DatabaseDescriptor
         return conf.column_index_size_in_kb * 1024;
     }
 
+    public static int getBatchSizeWarnThreshold()
+    {
+        return conf.batch_size_warn_threshold_in_kb * 1024;
+    }
+
     public static Collection<String> getInitialTokens()
     {
         return tokensFromString(System.getProperty("cassandra.initial_token", conf.initial_token));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ab0ea40..15ee59f 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -30,13 +30,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.statements.*;
-import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.thrift.ThriftClientState;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.SemanticVersion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 7f26341..8e61ae5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.collect.Iterables;
+import com.google.common.base.Function;
+import com.google.common.collect.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.github.jamm.MemoryMeter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
@@ -47,6 +51,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     private final List<ModificationStatement> statements;
     private final Attributes attrs;
     private final boolean hasConditions;
+    private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
 
     /**
      * Creates a new BatchStatement from a list of statements and a
@@ -177,6 +182,29 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         }
     }
 
+    /**
+     * Checks batch size to ensure threshold is met. If not, a warning is logged.
+     * @param cfs ColumnFamilies that will store the batch's mutations.
+     */
+    private void verifyBatchSize(Iterable<ColumnFamily> cfs)
+    {
+        long size = 0;
+        long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
+
+        for (ColumnFamily cf : cfs)
+            size += cf.dataSize();
+
+        if (size > warnThreshold)
+        {
+            Set<String> ksCfPairs = new HashSet<>();
+            for (ColumnFamily cf : cfs)
+                ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName);
+
+            String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.";
+            logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold);
+        }
+    }
+
     public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
         if (options.getConsistency() == null)
@@ -207,10 +235,21 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
 
     private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
     {
+        // Extract each collection of cfs from it's IMutation and then lazily concatenate all of them into a single Iterable.
+        Iterable<ColumnFamily> cfs = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<ColumnFamily>>()
+        {
+            public Collection<ColumnFamily> apply(IMutation im)
+            {
+                return im.getColumnFamilies();
+            }
+        }));
+        verifyBatchSize(cfs);
+
         boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
     }
 
+
     private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
     throws RequestExecutionException, RequestValidationException
     {
@@ -259,6 +298,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             }
         }
 
+        verifyBatchSize(Collections.singleton(updates));
         ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
         return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
     }


[4/6] git commit: merge from 2.0

Posted by jb...@apache.org.
merge from 2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/510c82e4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/510c82e4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/510c82e4

Branch: refs/heads/trunk
Commit: 510c82e4e73be07b44b4902b865a9eaeda5113e9
Parents: a2e7435 de720b4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 22 07:41:27 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 22 07:41:27 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 +++
 .../apache/cassandra/cql3/QueryProcessor.java   |  2 +-
 .../cql3/statements/BatchStatement.java         | 42 +++++++++++++++++++-
 6 files changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 495dab2,fffb2a5..ab3278e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,54 -1,14 +1,55 @@@
 -2.0.8
 - * Log a warning for large batches (CASSANDRA-6487)
 - * Queries on compact tables can return more rows that requested (CASSANDRA-7052)
 - * USING TIMESTAMP for batches does not work (CASSANDRA-7053)
 -Merged from 1.2:
 - * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
 - * Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)
 +2.1.0-beta2
 + * Add range tombstones to read repair digests (CASSANDRA-6863)
 + * Fix BTree.clear for large updates (CASSANDRA-6943)
 + * Fail write instead of logging a warning when unable to append to CL
 +   (CASSANDRA-6764)
 + * Eliminate possibility of CL segment appearing twice in active list 
 +   (CASSANDRA-6557)
 + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
 + * Switch CRC component to Adler and include it for compressed sstables 
 +   (CASSANDRA-4165)
 + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
 + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
 + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
 + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
 + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
 + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
 + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
 + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
 + * Scrub should not always clear out repaired status (CASSANDRA-5351)
 + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
 + * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
 + * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
 + * Change caching option syntax (CASSANDRA-6745)
 + * Fix stress to do proper counter reads (CASSANDRA-6835)
 + * Fix help message for stress counter_write (CASSANDRA-6824)
 + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
 + * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849)
 + * Fix race condition in Batch CLE (CASSANDRA-6860)
 + * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
 + * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
 + * Proper compare function for CollectionType (CASSANDRA-6783)
 + * Update native server to Netty 4 (CASSANDRA-6236)
 + * Fix off-by-one error in stress (CASSANDRA-6883)
 + * Make OpOrder AutoCloseable (CASSANDRA-6901)
 + * Remove sync repair JMX interface (CASSANDRA-6900)
 + * Add multiple memory allocation options for memtables (CASSANDRA-6689)
 + * Remove adjusted op rate from stress output (CASSANDRA-6921)
 + * Add optimized CF.hasColumns() implementations (CASSANDRA-6941)
 + * Serialize batchlog mutations with the version of the target node
 +   (CASSANDRA-6931)
 + * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
 + * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
 + * Lock counter cells, not partitions (CASSANDRA-6880)
 + * Track presence of legacy counter shards in sstables (CASSANDRA-6888)
 + * Ensure safe resource cleanup when replacing sstables (CASSANDRA-6912)
 + * Add failure handler to async callback (CASSANDRA-6747)
 + * Fix AE when closing SSTable without releasing reference (CASSANDRA-7000)
 + * Clean up IndexInfo on keyspace/table drops (CASSANDRA-6924)
 + * Only snapshot relative SSTables when sequential repair (CASSANDRA-7024)
   * Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
 -
 -
 -2.0.7
 +Merged from 2.0:
++ * Log a warning for large batches (CASSANDRA-6487)
   * Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
   * Avoid early loading of non-system keyspaces before compaction-leftovers 
     cleanup at startup (CASSANDRA-6913)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 9dc2ace,15ee59f..e8cee15
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -30,9 -30,7 +30,8 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.cql3.statements.*;
- import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.composites.*;
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index e0a81da,8e61ae5..88bb644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@@ -20,13 -20,15 +20,17 @@@ package org.apache.cassandra.cql3.state
  import java.nio.ByteBuffer;
  import java.util.*;
  
- import com.google.common.collect.Iterables;
+ import com.google.common.base.Function;
+ import com.google.common.collect.*;
+ import org.apache.cassandra.config.DatabaseDescriptor;
  import org.github.jamm.MemoryMeter;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
 +import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.cql3.*;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.composites.Composite;
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;


[5/6] git commit: merge from 2.0

Posted by jb...@apache.org.
merge from 2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/510c82e4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/510c82e4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/510c82e4

Branch: refs/heads/cassandra-2.1
Commit: 510c82e4e73be07b44b4902b865a9eaeda5113e9
Parents: a2e7435 de720b4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 22 07:41:27 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 22 07:41:27 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 +++
 .../apache/cassandra/cql3/QueryProcessor.java   |  2 +-
 .../cql3/statements/BatchStatement.java         | 42 +++++++++++++++++++-
 6 files changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 495dab2,fffb2a5..ab3278e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,54 -1,14 +1,55 @@@
 -2.0.8
 - * Log a warning for large batches (CASSANDRA-6487)
 - * Queries on compact tables can return more rows that requested (CASSANDRA-7052)
 - * USING TIMESTAMP for batches does not work (CASSANDRA-7053)
 -Merged from 1.2:
 - * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
 - * Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)
 +2.1.0-beta2
 + * Add range tombstones to read repair digests (CASSANDRA-6863)
 + * Fix BTree.clear for large updates (CASSANDRA-6943)
 + * Fail write instead of logging a warning when unable to append to CL
 +   (CASSANDRA-6764)
 + * Eliminate possibility of CL segment appearing twice in active list 
 +   (CASSANDRA-6557)
 + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
 + * Switch CRC component to Adler and include it for compressed sstables 
 +   (CASSANDRA-4165)
 + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
 + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
 + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
 + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
 + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
 + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
 + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
 + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
 + * Scrub should not always clear out repaired status (CASSANDRA-5351)
 + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
 + * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
 + * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
 + * Change caching option syntax (CASSANDRA-6745)
 + * Fix stress to do proper counter reads (CASSANDRA-6835)
 + * Fix help message for stress counter_write (CASSANDRA-6824)
 + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
 + * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849)
 + * Fix race condition in Batch CLE (CASSANDRA-6860)
 + * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
 + * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
 + * Proper compare function for CollectionType (CASSANDRA-6783)
 + * Update native server to Netty 4 (CASSANDRA-6236)
 + * Fix off-by-one error in stress (CASSANDRA-6883)
 + * Make OpOrder AutoCloseable (CASSANDRA-6901)
 + * Remove sync repair JMX interface (CASSANDRA-6900)
 + * Add multiple memory allocation options for memtables (CASSANDRA-6689)
 + * Remove adjusted op rate from stress output (CASSANDRA-6921)
 + * Add optimized CF.hasColumns() implementations (CASSANDRA-6941)
 + * Serialize batchlog mutations with the version of the target node
 +   (CASSANDRA-6931)
 + * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
 + * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
 + * Lock counter cells, not partitions (CASSANDRA-6880)
 + * Track presence of legacy counter shards in sstables (CASSANDRA-6888)
 + * Ensure safe resource cleanup when replacing sstables (CASSANDRA-6912)
 + * Add failure handler to async callback (CASSANDRA-6747)
 + * Fix AE when closing SSTable without releasing reference (CASSANDRA-7000)
 + * Clean up IndexInfo on keyspace/table drops (CASSANDRA-6924)
 + * Only snapshot relative SSTables when sequential repair (CASSANDRA-7024)
   * Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
 -
 -
 -2.0.7
 +Merged from 2.0:
++ * Log a warning for large batches (CASSANDRA-6487)
   * Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
   * Avoid early loading of non-system keyspaces before compaction-leftovers 
     cleanup at startup (CASSANDRA-6913)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 9dc2ace,15ee59f..e8cee15
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -30,9 -30,7 +30,8 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.cql3.statements.*;
- import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.composites.*;
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/510c82e4/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index e0a81da,8e61ae5..88bb644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@@ -20,13 -20,15 +20,17 @@@ package org.apache.cassandra.cql3.state
  import java.nio.ByteBuffer;
  import java.util.*;
  
- import com.google.common.collect.Iterables;
+ import com.google.common.base.Function;
+ import com.google.common.collect.*;
+ import org.apache.cassandra.config.DatabaseDescriptor;
  import org.github.jamm.MemoryMeter;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
 +import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.cql3.*;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.composites.Composite;
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;


[2/6] git commit: Log a warning for large batches patch by Lyuben Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487

Posted by jb...@apache.org.
Log a warning for large batches
patch by Lyuben Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de720b4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de720b4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de720b4a

Branch: refs/heads/cassandra-2.1
Commit: de720b4aa31198076abbd76a53644df341577126
Parents: 364282a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 22 07:40:32 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 22 07:40:32 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 +++
 .../apache/cassandra/cql3/QueryProcessor.java   |  2 +-
 .../cql3/statements/BatchStatement.java         | 42 +++++++++++++++++++-
 6 files changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 791586c..fffb2a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.8
+ * Log a warning for large batches (CASSANDRA-6487)
  * Queries on compact tables can return more rows that requested (CASSANDRA-7052)
  * USING TIMESTAMP for batches does not work (CASSANDRA-7053)
 Merged from 1.2:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2edd498..2de6753 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -429,6 +429,11 @@ tombstone_failure_threshold: 100000
 # that wastefully either.
 column_index_size_in_kb: 64
 
+
+# Log WARN on any batch size exceeding this value. 5kb per batch by default.
+# Caution should be taken on increasing the size of this threshold as it can lead to node instability.
+batch_size_warn_threshold_in_kb: 5
+
 # Size limit for rows being compacted in memory.  Larger rows will spill
 # over to disk and use a slower two-pass compaction process.  A message
 # will be logged specifying the row key.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5317fb8..7a3185a 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -120,6 +120,7 @@ public class Config
 
     /* if the size of columns or super-columns are more than this, indexing will kick in */
     public Integer column_index_size_in_kb = 64;
+    public Integer batch_size_warn_threshold_in_kb = 5;
     public Integer in_memory_compaction_limit_in_mb = 64;
     public Integer concurrent_compactors = FBUtilities.getAvailableProcessors();
     public volatile Integer compaction_throughput_mb_per_sec = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9e06601..6417524 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -658,6 +658,11 @@ public class DatabaseDescriptor
         return conf.column_index_size_in_kb * 1024;
     }
 
+    public static int getBatchSizeWarnThreshold()
+    {
+        return conf.batch_size_warn_threshold_in_kb * 1024;
+    }
+
     public static Collection<String> getInitialTokens()
     {
         return tokensFromString(System.getProperty("cassandra.initial_token", conf.initial_token));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ab0ea40..15ee59f 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -30,13 +30,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.statements.*;
-import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.thrift.ThriftClientState;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.SemanticVersion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 7f26341..8e61ae5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.collect.Iterables;
+import com.google.common.base.Function;
+import com.google.common.collect.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.github.jamm.MemoryMeter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
@@ -47,6 +51,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     private final List<ModificationStatement> statements;
     private final Attributes attrs;
     private final boolean hasConditions;
+    private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
 
     /**
      * Creates a new BatchStatement from a list of statements and a
@@ -177,6 +182,29 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         }
     }
 
+    /**
+     * Checks batch size to ensure threshold is met. If not, a warning is logged.
+     * @param cfs ColumnFamilies that will store the batch's mutations.
+     */
+    private void verifyBatchSize(Iterable<ColumnFamily> cfs)
+    {
+        long size = 0;
+        long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
+
+        for (ColumnFamily cf : cfs)
+            size += cf.dataSize();
+
+        if (size > warnThreshold)
+        {
+            Set<String> ksCfPairs = new HashSet<>();
+            for (ColumnFamily cf : cfs)
+                ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName);
+
+            String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.";
+            logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold);
+        }
+    }
+
     public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
         if (options.getConsistency() == null)
@@ -207,10 +235,21 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
 
     private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
     {
+        // Extract each collection of cfs from it's IMutation and then lazily concatenate all of them into a single Iterable.
+        Iterable<ColumnFamily> cfs = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<ColumnFamily>>()
+        {
+            public Collection<ColumnFamily> apply(IMutation im)
+            {
+                return im.getColumnFamilies();
+            }
+        }));
+        verifyBatchSize(cfs);
+
         boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
     }
 
+
     private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
     throws RequestExecutionException, RequestValidationException
     {
@@ -259,6 +298,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             }
         }
 
+        verifyBatchSize(Collections.singleton(updates));
         ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
         return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
     }