You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/04/22 14:41:51 UTC

[3/6] git commit: Log a warning for large batches patch by Lyuben Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487

Log a warning for large batches
patch by Lyuben Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de720b4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de720b4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de720b4a

Branch: refs/heads/trunk
Commit: de720b4aa31198076abbd76a53644df341577126
Parents: 364282a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 22 07:40:32 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 22 07:40:32 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 +++
 .../apache/cassandra/cql3/QueryProcessor.java   |  2 +-
 .../cql3/statements/BatchStatement.java         | 42 +++++++++++++++++++-
 6 files changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 791586c..fffb2a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.8
+ * Log a warning for large batches (CASSANDRA-6487)
  * Queries on compact tables can return more rows that requested (CASSANDRA-7052)
  * USING TIMESTAMP for batches does not work (CASSANDRA-7053)
 Merged from 1.2:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2edd498..2de6753 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -429,6 +429,11 @@ tombstone_failure_threshold: 100000
 # that wastefully either.
 column_index_size_in_kb: 64
 
+
+# Log WARN on any batch size exceeding this value. 5kb per batch by default.
+# Caution should be taken on increasing the size of this threshold as it can lead to node instability.
+batch_size_warn_threshold_in_kb: 5
+
 # Size limit for rows being compacted in memory.  Larger rows will spill
 # over to disk and use a slower two-pass compaction process.  A message
 # will be logged specifying the row key.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5317fb8..7a3185a 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -120,6 +120,7 @@ public class Config
 
     /* if the size of columns or super-columns are more than this, indexing will kick in */
     public Integer column_index_size_in_kb = 64;
+    public Integer batch_size_warn_threshold_in_kb = 5;
     public Integer in_memory_compaction_limit_in_mb = 64;
     public Integer concurrent_compactors = FBUtilities.getAvailableProcessors();
     public volatile Integer compaction_throughput_mb_per_sec = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9e06601..6417524 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -658,6 +658,11 @@ public class DatabaseDescriptor
         return conf.column_index_size_in_kb * 1024;
     }
 
+    public static int getBatchSizeWarnThreshold()
+    {
+        return conf.batch_size_warn_threshold_in_kb * 1024;
+    }
+
     public static Collection<String> getInitialTokens()
     {
         return tokensFromString(System.getProperty("cassandra.initial_token", conf.initial_token));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ab0ea40..15ee59f 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -30,13 +30,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.statements.*;
-import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.thrift.ThriftClientState;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.SemanticVersion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 7f26341..8e61ae5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.collect.Iterables;
+import com.google.common.base.Function;
+import com.google.common.collect.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.github.jamm.MemoryMeter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
@@ -47,6 +51,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     private final List<ModificationStatement> statements;
     private final Attributes attrs;
     private final boolean hasConditions;
+    private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
 
     /**
      * Creates a new BatchStatement from a list of statements and a
@@ -177,6 +182,29 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         }
     }
 
+    /**
+     * Checks batch size to ensure threshold is met. If not, a warning is logged.
+     * @param cfs ColumnFamilies that will store the batch's mutations.
+     */
+    private void verifyBatchSize(Iterable<ColumnFamily> cfs)
+    {
+        long size = 0;
+        long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
+
+        for (ColumnFamily cf : cfs)
+            size += cf.dataSize();
+
+        if (size > warnThreshold)
+        {
+            Set<String> ksCfPairs = new HashSet<>();
+            for (ColumnFamily cf : cfs)
+                ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName);
+
+            String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.";
+            logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold);
+        }
+    }
+
     public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
         if (options.getConsistency() == null)
@@ -207,10 +235,21 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
 
     private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
     {
+        // Extract each collection of cfs from it's IMutation and then lazily concatenate all of them into a single Iterable.
+        Iterable<ColumnFamily> cfs = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<ColumnFamily>>()
+        {
+            public Collection<ColumnFamily> apply(IMutation im)
+            {
+                return im.getColumnFamilies();
+            }
+        }));
+        verifyBatchSize(cfs);
+
         boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
     }
 
+
     private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
     throws RequestExecutionException, RequestValidationException
     {
@@ -259,6 +298,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             }
         }
 
+        verifyBatchSize(Collections.singleton(updates));
         ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
         return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
     }