You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/12/12 13:10:47 UTC
[1/2] cassandra git commit: Add mutation size and batch metrics
Repository: cassandra
Updated Branches:
refs/heads/trunk a8acb2a1b -> 31501cc8b
Add mutation size and batch metrics
patch by Alwyn Davis; reviewed by Benjamin Lerer for CASSANDRA-12649
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b84de4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b84de4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b84de4d
Branch: refs/heads/trunk
Commit: 3b84de4da4243eaf5d1353c7154fb22c866eff7b
Parents: 8883554
Author: Alwyn Davis <al...@instaclustr.com>
Authored: Mon Dec 12 14:05:39 2016 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Mon Dec 12 14:05:39 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 3 +-
doc/source/operating/metrics.rst | 25 ++++
.../cql3/statements/BatchStatement.java | 35 ++++--
src/java/org/apache/cassandra/db/IMutation.java | 16 +++
.../apache/cassandra/metrics/BatchMetrics.java | 38 ++++++
.../metrics/CASClientRequestMetrics.java | 5 -
.../metrics/CASClientWriteRequestMetrics.java | 52 ++++++++
.../metrics/ClientWriteRequestMetrics.java | 47 ++++++++
.../apache/cassandra/service/StorageProxy.java | 16 ++-
.../cassandra/metrics/BatchMetricsTest.java | 119 +++++++++++++++++++
10 files changed, 336 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e41b2b..dfb849d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
3.12
- * add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
+ * Add mutation size and batch metrics (CASSANDRA-12649)
+ * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
* Fix primary index calculation for SASI (CASSANDRA-12910)
* Expose time spent waiting in thread pool queue (CASSANDRA-8398)
* Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/doc/source/operating/metrics.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index 546d9c2..ef43128 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -249,6 +249,7 @@ Reported name format:
UnfinishedCommit Counter Number of transactions that were committed on write.
ConditionNotMet Counter Number of transaction preconditions did not match current values.
ContentionHistogram Histogram How many contended writes were encountered
+ MutationSizeHistogram Histogram Total size in bytes of the requests mutations.
===================== ============== =============================================================
@@ -286,6 +287,7 @@ Reported name format:
Failures Counter Number of write failures encountered.
|nbsp| Latency Write latency.
Unavailables Counter Number of unavailable exceptions encountered.
+ MutationSizeHistogram Histogram Total size in bytes of the requests mutations.
===================== ============== =============================================================
@@ -585,6 +587,29 @@ connectedNativeClients Counter Number of clients connected to this n
connectedThriftClients Counter Number of clients connected to this nodes thrift protocol server
=========================== ============== ===========
+
+Batch Metrics
+^^^^^^^^^^^^^
+
+Metrics specifc to batch statements.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.Batch.<MetricName>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=Batch name=<MetricName>``
+
+=========================== ============== ===========
+Name Type Description
+=========================== ============== ===========
+PartitionsPerCounterBatch Histogram Distribution of the number of partitions processed per counter batch
+PartitionsPerLoggedBatch Histogram Distribution of the number of partitions processed per logged batch
+PartitionsPerUnloggedBatch Histogram Distribution of the number of partitions processed per unlogged batch
+=========================== ============== ===========
+
+
JVM Metrics
^^^^^^^^^^^
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 83a8324..60a8df5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.metrics.BatchMetrics;
import org.apache.cassandra.service.*;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -79,6 +80,8 @@ public class BatchStatement implements CQLStatement
"tables involved in an atomic batch might cause batchlog " +
"entries to expire before being replayed.";
+ public static final BatchMetrics metrics = new BatchMetrics();
+
/**
* Creates a new BatchStatement from a list of statements and a
* Thrift consistency level.
@@ -259,7 +262,7 @@ public class BatchStatement implements CQLStatement
/**
* Checks batch size to ensure threshold is met. If not, a warning is logged.
*
- * @param updates - the batch mutations.
+ * @param mutations - the batch mutations.
*/
private static void verifyBatchSize(Collection<? extends IMutation> mutations) throws InvalidRequestException
{
@@ -267,14 +270,8 @@ public class BatchStatement implements CQLStatement
if (mutations.size() <= 1)
return;
- long size = 0;
long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
-
- for (IMutation mutation : mutations)
- {
- for (PartitionUpdate update : mutation.getPartitionUpdates())
- size += update.dataSize();
- }
+ long size = IMutation.dataSize(mutations);
if (size > warnThreshold)
{
@@ -369,10 +366,23 @@ public class BatchStatement implements CQLStatement
verifyBatchSize(mutations);
verifyBatchType(mutations);
+ updatePartitionsPerBatchMetrics(mutations.size());
+
boolean mutateAtomic = (isLogged() && mutations.size() > 1);
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, queryStartNanoTime);
}
+ private void updatePartitionsPerBatchMetrics(int updatedPartitions)
+ {
+ if (isLogged()) {
+ metrics.partitionsPerLoggedBatch.update(updatedPartitions);
+ } else if (isCounter()) {
+ metrics.partitionsPerCounterBatch.update(updatedPartitions);
+ } else {
+ metrics.partitionsPerUnloggedBatch.update(updatedPartitions);
+ }
+ }
+
private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state, long queryStartNanoTime)
throws RequestExecutionException, RequestValidationException
{
@@ -392,11 +402,16 @@ public class BatchStatement implements CQLStatement
state.getClientState(),
queryStartNanoTime))
{
- return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, result, columnsWithConditions, true, options.forStatement(0)));
+
+ return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName,
+ tableName,
+ result,
+ columnsWithConditions,
+ true,
+ options.forStatement(0)));
}
}
-
private Pair<CQL3CasRequest,Set<ColumnDefinition>> makeCasRequest(BatchQueryOptions options, QueryState state)
{
long now = state.getTimestamp();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index c734e16..0ac89f7 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -31,4 +31,20 @@ public interface IMutation
public long getTimeout();
public String toString(boolean shallow);
public Collection<PartitionUpdate> getPartitionUpdates();
+
+ /**
+ * Computes the total data size of the specified mutations.
+ * @param mutations the mutations
+ * @return the total data size of the specified mutations
+ */
+ public static long dataSize(Collection<? extends IMutation> mutations)
+ {
+ long size = 0;
+ for (IMutation mutation : mutations)
+ {
+ for (PartitionUpdate update : mutation.getPartitionUpdates())
+ size += update.dataSize();
+ }
+ return size;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/BatchMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/BatchMetrics.java b/src/java/org/apache/cassandra/metrics/BatchMetrics.java
new file mode 100644
index 0000000..9bea162
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/BatchMetrics.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Histogram;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class BatchMetrics
+{
+ private static final MetricNameFactory factory = new DefaultNameFactory("Batch");
+
+ public final Histogram partitionsPerLoggedBatch;
+ public final Histogram partitionsPerUnloggedBatch;
+ public final Histogram partitionsPerCounterBatch;
+
+ public BatchMetrics()
+ {
+ partitionsPerLoggedBatch = Metrics.histogram(factory.createMetricName("PartitionsPerLoggedBatch"), false);
+ partitionsPerUnloggedBatch = Metrics.histogram(factory.createMetricName("PartitionsPerUnloggedBatch"), false);
+ partitionsPerCounterBatch = Metrics.histogram(factory.createMetricName("PartitionsPerCounterBatch"), false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
index f3f1f64..9884ff1 100644
--- a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
@@ -23,12 +23,9 @@ import com.codahale.metrics.Histogram;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
-
public class CASClientRequestMetrics extends ClientRequestMetrics
{
public final Histogram contention;
- /* Used only for write */
- public final Counter conditionNotMet;
public final Counter unfinishedCommit;
@@ -36,7 +33,6 @@ public class CASClientRequestMetrics extends ClientRequestMetrics
{
super(scope);
contention = Metrics.histogram(factory.createMetricName("ContentionHistogram"), false);
- conditionNotMet = Metrics.counter(factory.createMetricName("ConditionNotMet"));
unfinishedCommit = Metrics.counter(factory.createMetricName("UnfinishedCommit"));
}
@@ -44,7 +40,6 @@ public class CASClientRequestMetrics extends ClientRequestMetrics
{
super.release();
Metrics.remove(factory.createMetricName("ContentionHistogram"));
- Metrics.remove(factory.createMetricName("ConditionNotMet"));
Metrics.remove(factory.createMetricName("UnfinishedCommit"));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java b/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java
new file mode 100644
index 0000000..5971074
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+/**
+ * Metrics for tracking information about CAS write requests.
+ *
+ */
+public class CASClientWriteRequestMetrics extends CASClientRequestMetrics
+{
+ /**
+ * Metric for tracking the mutation sizes in bytes.
+ */
+ public final Histogram mutationSize;
+
+ public final Counter conditionNotMet;
+
+ public CASClientWriteRequestMetrics(String scope)
+ {
+ super(scope);
+ mutationSize = Metrics.histogram(factory.createMetricName("MutationSizeHistogram"), false);
+ conditionNotMet = Metrics.counter(factory.createMetricName("ConditionNotMet"));
+ }
+
+ public void release()
+ {
+ super.release();
+ Metrics.remove(factory.createMetricName("ConditionNotMet"));
+ Metrics.remove(factory.createMetricName("MutationSizeHistogram"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java
new file mode 100644
index 0000000..50427af
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Histogram;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+/**
+ * Metrics for tracking information about write requests.
+ *
+ */
+public class ClientWriteRequestMetrics extends ClientRequestMetrics
+{
+ /**
+ * Metric for tracking the mutation sizes in bytes.
+ */
+ public final Histogram mutationSize;
+
+ public ClientWriteRequestMetrics(String scope)
+ {
+ super(scope);
+ mutationSize = Metrics.histogram(factory.createMetricName("MutationSizeHistogram"), false);
+ }
+
+ public void release()
+ {
+ super.release();
+ Metrics.remove(factory.createMetricName("MutationSizeHistogram"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e0be68c..7d77bd4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -99,12 +99,12 @@ public class StorageProxy implements StorageProxyMBean
};
private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read");
private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
- private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write");
- private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite");
+ private static final ClientWriteRequestMetrics writeMetrics = new ClientWriteRequestMetrics("Write");
+ private static final CASClientWriteRequestMetrics casWriteMetrics = new CASClientWriteRequestMetrics("CASWrite");
private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead");
private static final ViewWriteMetrics viewWriteMetrics = new ViewWriteMetrics("ViewWrite");
private static final Map<ConsistencyLevel, ClientRequestMetrics> readMetricsMap = new EnumMap<>(ConsistencyLevel.class);
- private static final Map<ConsistencyLevel, ClientRequestMetrics> writeMetricsMap = new EnumMap<>(ConsistencyLevel.class);
+ private static final Map<ConsistencyLevel, ClientWriteRequestMetrics> writeMetricsMap = new EnumMap<>(ConsistencyLevel.class);
private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
@@ -175,7 +175,7 @@ public class StorageProxy implements StorageProxyMBean
for(ConsistencyLevel level : ConsistencyLevel.values())
{
readMetricsMap.put(level, new ClientRequestMetrics("Read-" + level.name()));
- writeMetricsMap.put(level, new ClientRequestMetrics("Write-" + level.name()));
+ writeMetricsMap.put(level, new ClientWriteRequestMetrics("Write-" + level.name()));
}
}
@@ -273,6 +273,10 @@ public class StorageProxy implements StorageProxyMBean
// TODO turn null updates into delete?
PartitionUpdate updates = request.makeUpdates(current);
+ long size = updates.dataSize();
+ casWriteMetrics.mutationSize.update(size);
+ writeMetricsMap.get(consistencyForPaxos).mutationSize.update(size);
+
// Apply triggers to cas updates. A consideration here is that
// triggers emit Mutations, and so a given trigger implementation
// may generate mutations for partitions other than the one this
@@ -859,6 +863,10 @@ public class StorageProxy implements StorageProxyMBean
.viewManager
.updatesAffectView(mutations, true);
+ long size = IMutation.dataSize(mutations);
+ writeMetrics.mutationSize.update(size);
+ writeMetricsMap.get(consistencyLevel).mutationSize.update(size);
+
if (augmented != null)
mutateAtomically(augmented, consistencyLevel, updatesView, queryStartNanoTime);
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java b/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java
new file mode 100644
index 0000000..60ee725
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.io.IOException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.cassandra.cql3.statements.BatchStatement.metrics;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class BatchMetricsTest extends SchemaLoader
+{
+ private static EmbeddedCassandraService cassandra;
+
+ private static Cluster cluster;
+ private static Session session;
+
+ private static String KEYSPACE = "junit";
+ private static final String TABLE = "batchmetricstest";
+
+ private static PreparedStatement ps;
+
+ @BeforeClass()
+ public static void setup() throws ConfigurationException, IOException
+ {
+ Schema.instance.clear();
+
+ cassandra = new EmbeddedCassandraService();
+ cassandra.start();
+
+ cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+ session = cluster.connect();
+
+ session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
+ session.execute("USE " + KEYSPACE);
+ session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (id int PRIMARY KEY, val text);");
+
+ ps = session.prepare("INSERT INTO " + KEYSPACE + '.' + TABLE + " (id, val) VALUES (?, ?);");
+ }
+
+ private void executeBatch(boolean isLogged, int distinctPartitions, int statementsPerPartition)
+ {
+ BatchStatement.Type batchType;
+
+ if (isLogged) {
+ batchType = BatchStatement.Type.LOGGED;
+ } else {
+ batchType = BatchStatement.Type.UNLOGGED;
+ }
+
+ BatchStatement batch = new BatchStatement(batchType);
+
+ for (int i=0; i<distinctPartitions; i++) {
+ for (int j=0; j<statementsPerPartition; j++) {
+ batch.add(ps.bind(i, "aaaaaaaa"));
+ }
+ }
+
+ session.execute(batch);
+ }
+
+ @Test
+ public void testLoggedPartitionsPerBatch() {
+ int partitionsPerBatchCountPre = (int) metrics.partitionsPerLoggedBatch.getCount();
+ executeBatch(true, 10, 2);
+ assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerLoggedBatch.getCount());
+ assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerLoggedBatch.getSnapshot().getMax()); // decayingBuckets may not have exact value
+
+ partitionsPerBatchCountPre = (int) metrics.partitionsPerLoggedBatch.getCount();
+ executeBatch(true, 21, 2);
+ assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerLoggedBatch.getCount());
+ assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerLoggedBatch.getSnapshot().getMax());
+ }
+
+ @Test
+ public void testUnloggedPartitionsPerBatch() {
+ int partitionsPerBatchCountPre = (int) metrics.partitionsPerUnloggedBatch.getCount();
+ executeBatch(false, 7, 2);
+ assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerUnloggedBatch.getCount());
+ assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerUnloggedBatch.getSnapshot().getMax());
+
+ partitionsPerBatchCountPre = (int) metrics.partitionsPerUnloggedBatch.getCount();
+ executeBatch(false, 25, 2);
+ assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerUnloggedBatch.getCount());
+ assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerUnloggedBatch.getSnapshot().getMax());
+ }
+}
[2/2] cassandra git commit: Merge branch cassandra-3.X into trunk
Posted by bl...@apache.org.
Merge branch cassandra-3.X into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/31501cc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/31501cc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/31501cc8
Branch: refs/heads/trunk
Commit: 31501cc8b1f2b66dce4abc59891984ba790896b0
Parents: a8acb2a 3b84de4
Author: Benjamin Lerer <b....@gmail.com>
Authored: Mon Dec 12 14:09:46 2016 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Mon Dec 12 14:10:13 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 3 +-
doc/source/operating/metrics.rst | 25 ++++
.../cql3/statements/BatchStatement.java | 35 ++++--
src/java/org/apache/cassandra/db/IMutation.java | 16 +++
.../apache/cassandra/metrics/BatchMetrics.java | 38 ++++++
.../metrics/CASClientRequestMetrics.java | 5 -
.../metrics/CASClientWriteRequestMetrics.java | 52 ++++++++
.../metrics/ClientWriteRequestMetrics.java | 47 ++++++++
.../apache/cassandra/service/StorageProxy.java | 16 ++-
.../cassandra/metrics/BatchMetricsTest.java | 119 +++++++++++++++++++
10 files changed, 336 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/31501cc8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 18e0833,dfb849d..fa2bf93
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,14 -1,6 +1,15 @@@
+4.0
+ * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
+ * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
+ * Add (automate) Nodetool Documentation (CASSANDRA-12672)
+ * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+ * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
+
+
3.12
- * add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
+ * Add mutation size and batch metrics (CASSANDRA-12649)
+ * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
* Fix primary index calculation for SASI (CASSANDRA-12910)
* Expose time spent waiting in thread pool queue (CASSANDRA-8398)
* Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/31501cc8/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------