You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/02/14 21:45:50 UTC

[cassandra] branch trunk updated: updateCoordinatorWriteLatencyTableMetric can produce misleading metrics

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 247502c  updateCoordinatorWriteLatencyTableMetric can produce misleading metrics
247502c is described below

commit 247502c5d19c181bbe0a224da3ad6ebd0156f607
Author: David Capwell <dc...@gmail.com>
AuthorDate: Tue Feb 11 18:41:21 2020 -0800

    updateCoordinatorWriteLatencyTableMetric can produce misleading metrics
    
    Patch by David Capwell; Reviewed by Blake Eggleston for CASSANDRA-15569
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/service/StorageProxy.java | 129 ++++++++++++++++-----
 .../apache/cassandra/metrics/TableMetricsTest.java | 108 +++++++++++++----
 3 files changed, 188 insertions(+), 50 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c4481ae..5fe958c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * updateCoordinatorWriteLatencyTableMetric can produce misleading metrics (CASSANDRA-15569)
  * Added documentation for read repair and an example of full repair (CASSANDRA-15485)
  * Make cqlsh and cqlshlib Python 2 & 3 compatible (CASSANDRA-10190)
  * Added documentation for Full Query Logging (CASSANDRA-15475)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9fc6b52..0caee86 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -19,19 +19,37 @@ package org.apache.cassandra.service;
 
 import java.nio.ByteBuffer;
 import java.nio.file.Paths;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheLoader;
-import com.google.common.collect.*;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.Uninterruptibles;
-
 import org.apache.commons.lang3.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,38 +58,97 @@ import org.apache.cassandra.audit.FullQueryLoggerOptions;
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.service.reads.AbstractReadExecutor;
-import org.apache.cassandra.service.reads.DataResolver;
-import org.apache.cassandra.service.reads.ReadCallback;
-import org.apache.cassandra.service.reads.repair.ReadRepair;
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.SchemaConstants;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.TruncateRequest;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.ViewUtils;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.RingPosition;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.CasWriteTimeoutException;
+import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.IsBootstrappingException;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestFailureException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.RequestTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.index.Index;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.metrics.*;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.locator.Replicas;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.metrics.CASClientRequestMetrics;
+import org.apache.cassandra.metrics.CASClientWriteRequestMetrics;
+import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.metrics.ClientWriteRequestMetrics;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.metrics.ViewWriteMetrics;
+import org.apache.cassandra.net.ForwardingInfo;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageFlag;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.service.paxos.PrepareCallback;
 import org.apache.cassandra.service.paxos.ProposeCallback;
-import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.reads.AbstractReadExecutor;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
-import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
+import org.apache.cassandra.utils.MonotonicClock;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -980,14 +1057,12 @@ public class StorageProxy implements StorageProxyMBean
 
         try
         {
-            //TODO: Avoid giving same latency number for each CF in each mutation in a given set of mutations
             //We could potentially pass a callback into performWrite. And add callback provision for mutateCounter or mutateAtomically (sendToHintedEndPoints)
             //However, Trade off between write metric per CF accuracy vs performance hit due to callbacks. Similar issue exists with CoordinatorReadLatency metric.
-            mutations.forEach(mutation -> {
-                mutation.getTableIds().forEach(tableId -> {
-                    Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(tableId).metric.coordinatorWriteLatency.update(latency, TimeUnit.NANOSECONDS);
-                });
-            });
+            mutations.stream()
+                     .flatMap(m -> m.getTableIds().stream().map(tableId -> Keyspace.open(m.getKeyspaceName()).getColumnFamilyStore(tableId)))
+                     .distinct()
+                     .forEach(store -> store.metric.coordinatorWriteLatency.update(latency, TimeUnit.NANOSECONDS));
         }
         catch (Exception ex)
         {
diff --git a/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java
index a3ae956..c5434fe 100644
--- a/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java
@@ -36,8 +36,8 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.service.EmbeddedCassandraService;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class TableMetricsTest extends SchemaLoader
@@ -66,15 +66,23 @@ public class TableMetricsTest extends SchemaLoader
 
     private ColumnFamilyStore recreateTable()
     {
-        session.execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, TABLE));
-        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id int, val1 text, val2 text, PRIMARY KEY(id, val1));", KEYSPACE, TABLE));
-        return ColumnFamilyStore.getIfExists(KEYSPACE, TABLE);
+        return recreateTable(TABLE);
     }
 
-    private void executeBatch(boolean isLogged, int distinctPartitions, int statementsPerPartition)
+    private ColumnFamilyStore recreateTable(String table)
     {
+        session.execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, table));
+        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id int, val1 text, val2 text, PRIMARY KEY(id, val1));", KEYSPACE, table));
+        return ColumnFamilyStore.getIfExists(KEYSPACE, table);
+    }
+
+    private void executeBatch(boolean isLogged, int distinctPartitions, int statementsPerPartition, String... tables)
+    {
+        if (tables == null || tables.length == 0)
+        {
+            tables = new String[] { TABLE };
+        }
         BatchStatement.Type batchType;
-        PreparedStatement ps = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?);", KEYSPACE, TABLE));
 
         if (isLogged)
         {
@@ -87,6 +95,16 @@ public class TableMetricsTest extends SchemaLoader
 
         BatchStatement batch = new BatchStatement(batchType);
 
+        for (String table : tables)
+            populateBatch(batch, table, distinctPartitions, statementsPerPartition);
+
+        session.execute(batch);
+    }
+
+    private static void populateBatch(BatchStatement batch, String table, int distinctPartitions, int statementsPerPartition)
+    {
+        PreparedStatement ps = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?);", KEYSPACE, table));
+
         for (int i=0; i<distinctPartitions; i++)
         {
             for (int j=0; j<statementsPerPartition; j++)
@@ -94,8 +112,6 @@ public class TableMetricsTest extends SchemaLoader
                 batch.add(ps.bind(i, j + "a", "b"));
             }
         }
-
-        session.execute(batch);
     }
 
     @Test
@@ -103,7 +119,7 @@ public class TableMetricsTest extends SchemaLoader
     {
         ColumnFamilyStore cfs = recreateTable();
         assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
-        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+        assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
 
         for (int i = 0; i < 10; i++)
         {
@@ -111,7 +127,7 @@ public class TableMetricsTest extends SchemaLoader
         }
 
         assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount());
-        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+        assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
     }
 
     @Test
@@ -121,7 +137,7 @@ public class TableMetricsTest extends SchemaLoader
         PreparedStatement metricsStatement = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?)", KEYSPACE, TABLE));
 
         assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
-        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+        assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
 
         for (int i = 0; i < 10; i++)
         {
@@ -129,7 +145,7 @@ public class TableMetricsTest extends SchemaLoader
         }
 
         assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount());
-        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+        assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
     }
 
     @Test
@@ -137,14 +153,36 @@ public class TableMetricsTest extends SchemaLoader
     {
         ColumnFamilyStore cfs = recreateTable();
         assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
-        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+        assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
 
         executeBatch(true, 10, 2);
-        assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount());
+        assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount());
 
         executeBatch(true, 20, 2);
-        assertEquals(30, cfs.metric.coordinatorWriteLatency.getCount());
-        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+        assertEquals(2, cfs.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+        assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
+    }
+
+    @Test
+    public void testLoggedPartitionsPerBatchMultiTable()
+    {
+        ColumnFamilyStore first = recreateTable();
+        assertEquals(0, first.metric.coordinatorWriteLatency.getCount());
+        assertEquals(0.0, first.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
+
+        ColumnFamilyStore second = recreateTable(TABLE + "_second");
+        assertEquals(0, second.metric.coordinatorWriteLatency.getCount());
+        assertEquals(0.0, second.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
+
+        executeBatch(true, 10, 2, TABLE, TABLE + "_second");
+        assertEquals(1, first.metric.coordinatorWriteLatency.getCount());
+        assertEquals(1, second.metric.coordinatorWriteLatency.getCount());
+
+        executeBatch(true, 20, 2, TABLE, TABLE + "_second");
+        assertEquals(2, first.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+        assertEquals(2, second.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+        assertGreaterThan(first.metric.coordinatorWriteLatency.getMeanRate(), 0);
+        assertGreaterThan(second.metric.coordinatorWriteLatency.getMeanRate(), 0);
     }
 
     @Test
@@ -152,15 +190,35 @@ public class TableMetricsTest extends SchemaLoader
     {
         ColumnFamilyStore cfs = recreateTable();
         assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
-        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+        assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
 
         executeBatch(false, 5, 3);
-        assertEquals(5, cfs.metric.coordinatorWriteLatency.getCount());
+        assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount());
 
         executeBatch(false, 25, 2);
-        assertEquals(30, cfs.metric.coordinatorWriteLatency.getCount());
-        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+        assertEquals(2, cfs.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+        assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
+    }
 
+    @Test
+    public void testUnloggedPartitionsPerBatchMultiTable()
+    {
+        ColumnFamilyStore first = recreateTable();
+        assertEquals(0, first.metric.coordinatorWriteLatency.getCount());
+        assertEquals(0.0, first.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
+
+        ColumnFamilyStore second = recreateTable(TABLE + "_second");
+        assertEquals(0, second.metric.coordinatorWriteLatency.getCount());
+        assertEquals(0.0, second.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
+
+        executeBatch(false, 5, 3, TABLE, TABLE + "_second");
+        assertEquals(1, first.metric.coordinatorWriteLatency.getCount());
+
+        executeBatch(false, 25, 2, TABLE, TABLE + "_second");
+        assertEquals(2, first.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+        assertEquals(2, second.metric.coordinatorWriteLatency.getCount()); // 2 for previous batch and this batch
+        assertGreaterThan(first.metric.coordinatorWriteLatency.getMeanRate(), 0);
+        assertGreaterThan(second.metric.coordinatorWriteLatency.getMeanRate(), 0);
     }
 
     @Test
@@ -168,9 +226,13 @@ public class TableMetricsTest extends SchemaLoader
     {
         ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, COUNTER_TABLE);
         assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
-        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+        assertEquals(0.0, cfs.metric.coordinatorWriteLatency.getMeanRate(), 0.0);
         session.execute(String.format("UPDATE %s.%s SET id_c = id_c + 1 WHERE id = 1 AND val = 'val1'", KEYSPACE, COUNTER_TABLE));
         assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount());
-        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+        assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
+    }
+
+    private static void assertGreaterThan(double actual, double expectedLessThan) {
+        assertTrue("Expected " + actual + " > " + expectedLessThan, actual > expectedLessThan);
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org