You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/04/22 14:41:51 UTC
[3/6] git commit: Log a warning for large batches patch by Lyuben
Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487
Log a warning for large batches
patch by Lyuben Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de720b4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de720b4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de720b4a
Branch: refs/heads/trunk
Commit: de720b4aa31198076abbd76a53644df341577126
Parents: 364282a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 22 07:40:32 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 22 07:40:32 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 5 +++
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 5 +++
.../apache/cassandra/cql3/QueryProcessor.java | 2 +-
.../cql3/statements/BatchStatement.java | 42 +++++++++++++++++++-
6 files changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 791586c..fffb2a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.8
+ * Log a warning for large batches (CASSANDRA-6487)
* Queries on compact tables can return more rows that requested (CASSANDRA-7052)
* USING TIMESTAMP for batches does not work (CASSANDRA-7053)
Merged from 1.2:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2edd498..2de6753 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -429,6 +429,11 @@ tombstone_failure_threshold: 100000
# that wastefully either.
column_index_size_in_kb: 64
+
+# Log WARN on any batch size exceeding this value. 5kb per batch by default.
+# Caution should be taken on increasing the size of this threshold as it can lead to node instability.
+batch_size_warn_threshold_in_kb: 5
+
# Size limit for rows being compacted in memory. Larger rows will spill
# over to disk and use a slower two-pass compaction process. A message
# will be logged specifying the row key.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5317fb8..7a3185a 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -120,6 +120,7 @@ public class Config
/* if the size of columns or super-columns are more than this, indexing will kick in */
public Integer column_index_size_in_kb = 64;
+ public Integer batch_size_warn_threshold_in_kb = 5;
public Integer in_memory_compaction_limit_in_mb = 64;
public Integer concurrent_compactors = FBUtilities.getAvailableProcessors();
public volatile Integer compaction_throughput_mb_per_sec = 16;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9e06601..6417524 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -658,6 +658,11 @@ public class DatabaseDescriptor
return conf.column_index_size_in_kb * 1024;
}
+ public static int getBatchSizeWarnThreshold()
+ {
+ return conf.batch_size_warn_threshold_in_kb * 1024;
+ }
+
public static Collection<String> getInitialTokens()
{
return tokensFromString(System.getProperty("cassandra.initial_token", conf.initial_token));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ab0ea40..15ee59f 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -30,13 +30,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cql3.statements.*;
-import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.db.*;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.thrift.ThriftClientState;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MD5Digest;
import org.apache.cassandra.utils.SemanticVersion;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 7f26341..8e61ae5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
-import com.google.common.collect.Iterables;
+import com.google.common.base.Function;
+import com.google.common.collect.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.github.jamm.MemoryMeter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
@@ -47,6 +51,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
private final List<ModificationStatement> statements;
private final Attributes attrs;
private final boolean hasConditions;
+ private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
/**
* Creates a new BatchStatement from a list of statements and a
@@ -177,6 +182,29 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
}
}
+ /**
+ * Checks batch size to ensure threshold is met. If not, a warning is logged.
+ * @param cfs ColumnFamilies that will store the batch's mutations.
+ */
+ private void verifyBatchSize(Iterable<ColumnFamily> cfs)
+ {
+ long size = 0;
+ long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
+
+ for (ColumnFamily cf : cfs)
+ size += cf.dataSize();
+
+ if (size > warnThreshold)
+ {
+ Set<String> ksCfPairs = new HashSet<>();
+ for (ColumnFamily cf : cfs)
+ ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName);
+
+ String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.";
+ logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold);
+ }
+ }
+
public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
if (options.getConsistency() == null)
@@ -207,10 +235,21 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
{
+ // Extract each collection of cfs from it's IMutation and then lazily concatenate all of them into a single Iterable.
+ Iterable<ColumnFamily> cfs = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<ColumnFamily>>()
+ {
+ public Collection<ColumnFamily> apply(IMutation im)
+ {
+ return im.getColumnFamilies();
+ }
+ }));
+ verifyBatchSize(cfs);
+
boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
}
+
private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
throws RequestExecutionException, RequestValidationException
{
@@ -259,6 +298,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
}
}
+ verifyBatchSize(Collections.singleton(updates));
ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
}