You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/02/14 21:45:50 UTC
[cassandra] branch trunk updated:
updateCoordinatorWriteLatencyTableMetric can produce misleading metrics
This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 247502c updateCoordinatorWriteLatencyTableMetric can produce misleading metrics
247502c is described below
commit 247502c5d19c181bbe0a224da3ad6ebd0156f607
Author: David Capwell <dc...@gmail.com>
AuthorDate: Tue Feb 11 18:41:21 2020 -0800
updateCoordinatorWriteLatencyTableMetric can produce misleading metrics
Patch by David Capwell; Reviewed by Blake Eggleston for CASSANDRA-15569
---
CHANGES.txt | 1 +
.../org/apache/cassandra/service/StorageProxy.java | 129 ++++++++++++++++-----
.../apache/cassandra/metrics/TableMetricsTest.java | 108 +++++++++++++----
3 files changed, 188 insertions(+), 50 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c4481ae..5fe958c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * updateCoordinatorWriteLatencyTableMetric can produce misleading metrics (CASSANDRA-15569)
* Added documentation for read repair and an example of full repair (CASSANDRA-15485)
* Make cqlsh and cqlshlib Python 2 & 3 compatible (CASSANDRA-10190)
* Added documentation for Full Query Logging (CASSANDRA-15475)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9fc6b52..0caee86 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -19,19 +19,37 @@ package org.apache.cassandra.service;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheLoader;
-import com.google.common.collect.*;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Uninterruptibles;
-
import org.apache.commons.lang3.StringUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,38 +58,97 @@ import org.apache.cassandra.audit.FullQueryLoggerOptions;
import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.service.reads.AbstractReadExecutor;
-import org.apache.cassandra.service.reads.DataResolver;
-import org.apache.cassandra.service.reads.ReadCallback;
-import org.apache.cassandra.service.reads.repair.ReadRepair;
-import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.SchemaConstants;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.TruncateRequest;
+import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.ViewUtils;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.RingPosition;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.CasWriteTimeoutException;
+import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.IsBootstrappingException;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestFailureException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.RequestTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.index.Index;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.metrics.*;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.locator.Replicas;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.metrics.CASClientRequestMetrics;
+import org.apache.cassandra.metrics.CASClientWriteRequestMetrics;
+import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.metrics.ClientWriteRequestMetrics;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.metrics.ViewWriteMetrics;
+import org.apache.cassandra.net.ForwardingInfo;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageFlag;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.PrepareCallback;
import org.apache.cassandra.service.paxos.ProposeCallback;
-import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.reads.AbstractReadExecutor;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.triggers.TriggerExecutor;
-import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
+import org.apache.cassandra.utils.MonotonicClock;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -980,14 +1057,12 @@ public class StorageProxy implements StorageProxyMBean
try
{
- //TODO: Avoid giving same latency number for each CF in each mutation in a given set of mutations
//We could potentially pass a callback into performWrite. And add callback provision for mutateCounter or mutateAtomically (sendToHintedEndPoints)
//However, Trade off between write metric per CF accuracy vs performance hit due to callbacks. Similar issue exists with CoordinatorReadLatency metric.
- mutations.forEach(mutation -> {
- mutation.getTableIds().forEach(tableId -> {
- Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(tableId).metric.coordinatorWriteLatency.update(latency, TimeUnit.NANOSECONDS);
- });
- });
+ mutations.stream()
+ .flatMap(m -> m.getTableIds().stream().map(tableId -> Keyspace.open(m.getKeyspaceName()).getColumnFamilyStore(tableId)))
+ .distinct()
+ .forEach(store -> store.metric.coordinatorWriteLatency.update(latency, TimeUnit.NANOSECONDS));
}
catch (Exception ex)
{
diff --git a/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java
index a3ae956..c5434fe 100644
--- a/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java
@@ -36,8 +36,8 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.EmbeddedCassandraService;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
@RunWith(OrderedJUnit4ClassRunner.class)
public class TableMetricsTest extends SchemaLoader
@@ -66,15 +66,23 @@ public class TableMetricsTest extends SchemaLoader
private ColumnFamilyStore recreateTable()
{
- session.execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, TABLE));
- session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id int, val1 text, val2 text, PRIMARY KEY(id, val1));", KEYSPACE, TABLE));
- return ColumnFamilyStore.getIfExists(KEYSPACE, TABLE);
+ return recreateTable(TABLE);
}
- private void executeBatch(boolean isLogged, int distinctPartitions, int statementsPerPartition)
+ private ColumnFamilyStore recreateTable(String table)
{
+ session.execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, table));
+ session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id int, val1 text, val2 text, PRIMARY KEY(id, val1));", KEYSPACE, table));
+ return ColumnFamilyStore.getIfExists(KEYSPACE, table);
+ }
+
+ private void executeBatch(boolean isLogged, int distinctPartitions, int statementsPerPartition, String... tables)
+ {
+ if (tables == null || tables.length == 0)
+ {
+ tables = new String[] { TABLE };
+ }
BatchStatement.Type batchType;
- PreparedStatement ps = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?);", KEYSPACE, TABLE));
if (isLogged)
{
@@ -87,6 +95,16 @@ public class TableMetricsTest extends SchemaLoader
BatchStatement batch = new BatchStatement(batchType);
+ for (String table : tables)
+ populateBatch(batch, table, distinctPartitions, statementsPerPartition);
+
+ session.execute(batch);
+ }
+
+ private static void populateBatch(BatchStatement batch, String table, int distinctPartitions, int statementsPerPartition)
+ {
+ PreparedStatement ps = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?);", KEYSPACE, table));
+
for (int i=0; i<distinctPartitions; i++)
{
for (int j=0; j<statementsPerPartition; j++)
@@ -94,8 +112,6 @@ public class TableMetricsTest extends SchemaLoader
batch.add(ps.bind(i, j + "a", "b"));
}
}
-
- session.execute(batch);
}
@Test
@@ -103,7 +119,7 @@ public class TableMetricsTest extends SchemaLoader
{
ColumnFamilyStore cfs = recreateTable();
assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
- assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+ assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
for (int i = 0; i < 10; i++)
{
@@ -111,7 +127,7 @@ public class TableMetricsTest extends SchemaLoader
}
assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount());
- assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+ assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
}
@Test
@@ -121,7 +137,7 @@ public class TableMetricsTest extends SchemaLoader
PreparedStatement metricsStatement = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?)", KEYSPACE, TABLE));
assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
- assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+ assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
for (int i = 0; i < 10; i++)
{
@@ -129,7 +145,7 @@ public class TableMetricsTest extends SchemaLoader
}
assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount());
- assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+ assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
}
@Test
@@ -137,14 +153,36 @@ public class TableMetricsTest extends SchemaLoader
{
ColumnFamilyStore cfs = recreateTable();
assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
- assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+ assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
executeBatch(true, 10, 2);
- assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount());
+ assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount());
executeBatch(true, 20, 2);
- assertEquals(30, cfs.metric.coordinatorWriteLatency.getCount());
- assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+ assertEquals(2, cfs.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+ assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
+ }
+
+ @Test
+ public void testLoggedPartitionsPerBatchMultiTable()
+ {
+ ColumnFamilyStore first = recreateTable();
+ assertEquals(0, first.metric.coordinatorWriteLatency.getCount());
+ assertEquals(0.0, first.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
+
+ ColumnFamilyStore second = recreateTable(TABLE + "_second");
+ assertEquals(0, second.metric.coordinatorWriteLatency.getCount());
+ assertEquals(0.0, second.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
+
+ executeBatch(true, 10, 2, TABLE, TABLE + "_second");
+ assertEquals(1, first.metric.coordinatorWriteLatency.getCount());
+ assertEquals(1, second.metric.coordinatorWriteLatency.getCount());
+
+ executeBatch(true, 20, 2, TABLE, TABLE + "_second");
+ assertEquals(2, first.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+ assertEquals(2, second.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+ assertGreaterThan(first.metric.coordinatorWriteLatency.getMeanRate(), 0);
+ assertGreaterThan(second.metric.coordinatorWriteLatency.getMeanRate(), 0);
}
@Test
@@ -152,15 +190,35 @@ public class TableMetricsTest extends SchemaLoader
{
ColumnFamilyStore cfs = recreateTable();
assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
- assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+ assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
executeBatch(false, 5, 3);
- assertEquals(5, cfs.metric.coordinatorWriteLatency.getCount());
+ assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount());
executeBatch(false, 25, 2);
- assertEquals(30, cfs.metric.coordinatorWriteLatency.getCount());
- assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+ assertEquals(2, cfs.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+ assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
+ }
+ @Test
+ public void testUnloggedPartitionsPerBatchMultiTable()
+ {
+ ColumnFamilyStore first = recreateTable();
+ assertEquals(0, first.metric.coordinatorWriteLatency.getCount());
+ assertEquals(0.0, first.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
+
+ ColumnFamilyStore second = recreateTable(TABLE + "_second");
+ assertEquals(0, second.metric.coordinatorWriteLatency.getCount());
+ assertEquals(0.0, second.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
+
+ executeBatch(false, 5, 3, TABLE, TABLE + "_second");
+ assertEquals(1, first.metric.coordinatorWriteLatency.getCount());
+
+ executeBatch(false, 25, 2, TABLE, TABLE + "_second");
+ assertEquals(2, first.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+ assertEquals(2, second.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+ assertGreaterThan(first.metric.coordinatorWriteLatency.getMeanRate(), 0);
+ assertGreaterThan(second.metric.coordinatorWriteLatency.getMeanRate(), 0);
}
@Test
@@ -168,9 +226,13 @@ public class TableMetricsTest extends SchemaLoader
{
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, COUNTER_TABLE);
assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
- assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+ assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
session.execute(String.format("UPDATE %s.%s SET id_c = id_c + 1 WHERE id = 1 AND val = 'val1'", KEYSPACE, COUNTER_TABLE));
assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount());
- assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+ assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
+ }
+
+ private static void assertGreaterThan(double actual, double expectedLessThan) {
+ assertTrue("Expected " + actual + " > " + expectedLessThan, actual > expectedLessThan);
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org