You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jo...@apache.org on 2022/09/06 16:09:10 UTC

[cassandra] branch cassandra-4.1 updated: Speculative execution threshold unit mismatch

This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new 6748b8b7ea Speculative execution threshold unit mismatch
6748b8b7ea is described below

commit 6748b8b7ead6fff7820045ebbfe23f6c050f2efe
Author: Jon Meredith <jo...@apache.org>
AuthorDate: Fri Sep 2 10:28:21 2022 -0600

    Speculative execution threshold unit mismatch
    
    patch by Jon Meredith; reviewed by Caleb Rackliffe, Yifan Cai for CASSANDRA-17877
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 13 +++---
 .../metrics/CassandraMetricsRegistry.java          |  7 ++--
 .../apache/cassandra/metrics/MessagingMetrics.java |  5 ++-
 .../org/apache/cassandra/metrics/TableMetrics.java |  5 ++-
 .../service/AbstractWriteResponseHandler.java      |  7 ++--
 .../service/reads/AbstractReadExecutor.java        | 12 ++++--
 .../service/reads/FixedSpeculativeRetryPolicy.java |  2 +-
 .../reads/PercentileSpeculativeRetryPolicy.java    |  1 +
 .../service/reads/SpeculativeRetryPolicy.java      |  7 ++++
 .../service/reads/repair/AbstractReadRepair.java   |  6 +--
 .../service/reads/repair/BlockingReadRepair.java   |  3 +-
 .../apache/cassandra/db/ColumnFamilyStoreTest.java | 49 ++++++++++++++++++++++
 .../cassandra/service/OptionalTasksTest.java       |  8 ++--
 .../cassandra/service/reads/ReadExecutorTest.java  |  4 +-
 .../reads/repair/AbstractReadRepairTest.java       |  8 ++--
 .../service/reads/repair/ReadRepairTest.java       |  2 +-
 17 files changed, 104 insertions(+), 36 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index bf0ea108f8..6583cbfd1e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1-alpha2
+ * Speculative execution threshold unit mismatch (CASSANDRA-17877)
  * Fix BulkLoader to load entireSSTableThrottle and entireSSTableInterDcThrottle (CASSANDRA-17677)
  * Fix a race condition where a keyspace can be oopened while it is being removed (CASSANDRA-17658)
  * DatabaseDescriptor will set the default failure detector during client initialization (CASSANDRA-17782)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a40e5c7ad1..2ba57683ca 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -174,7 +174,6 @@ import org.apache.cassandra.utils.concurrent.Refs;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static com.google.common.base.Throwables.propagate;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static org.apache.cassandra.config.DatabaseDescriptor.getFlushWriters;
 import static org.apache.cassandra.db.commitlog.CommitLogPosition.NONE;
@@ -308,8 +307,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
     private final Directories directories;
 
     public final TableMetrics metric;
-    public volatile long sampleReadLatencyNanos;
-    public volatile long additionalWriteLatencyNanos;
+    public volatile long sampleReadLatencyMicros;
+    public volatile long additionalWriteLatencyMicros;
 
     private final CassandraTableWriteHandler writeHandler;
     private final CassandraStreamManager streamManager;
@@ -478,8 +477,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
         crcCheckChance = new DefaultValue<>(metadata.get().params.crcCheckChance);
         viewManager = keyspace.viewManager.forTable(metadata.id);
         this.sstableIdGenerator = sstableIdGenerator;
-        sampleReadLatencyNanos = DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS) / 2;
-        additionalWriteLatencyNanos = DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS) / 2;
+        sampleReadLatencyMicros = DatabaseDescriptor.getReadRpcTimeout(TimeUnit.MICROSECONDS) / 2;
+        additionalWriteLatencyMicros = DatabaseDescriptor.getWriteRpcTimeout(TimeUnit.MICROSECONDS) / 2;
         memtableFactory = metadata.get().params.memtable.factory();
 
         logger.info("Initializing {}.{}", keyspace.getName(), name);
@@ -574,8 +573,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
     {
         try
         {
-            sampleReadLatencyNanos = metadata().params.speculativeRetry.calculateThreshold(metric.coordinatorReadLatency, sampleReadLatencyNanos);
-            additionalWriteLatencyNanos = metadata().params.additionalWritePolicy.calculateThreshold(metric.coordinatorWriteLatency, additionalWriteLatencyNanos);
+            sampleReadLatencyMicros = metadata().params.speculativeRetry.calculateThreshold(metric.coordinatorReadLatency, sampleReadLatencyMicros);
+            additionalWriteLatencyMicros = metadata().params.additionalWritePolicy.calculateThreshold(metric.coordinatorWriteLatency, additionalWriteLatencyMicros);
         }
         catch (Throwable e)
         {
diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 8ba0bda44e..37c37e357b 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -45,6 +45,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
     private final Map<String, ThreadPoolMetrics> threadPoolMetrics = new ConcurrentHashMap<>();
 
     private final MBeanWrapper mBeanServer = MBeanWrapper.instance;
+    public final static TimeUnit DEFAULT_TIMER_UNIT = TimeUnit.MICROSECONDS;
 
     private CassandraMetricsRegistry()
     {
@@ -98,12 +99,12 @@ public class CassandraMetricsRegistry extends MetricRegistry
 
     public Timer timer(MetricName name)
     {
-        return timer(name, TimeUnit.MICROSECONDS);
+        return timer(name, DEFAULT_TIMER_UNIT);
     }
 
     public SnapshottingTimer timer(MetricName name, MetricName alias)
     {
-        return timer(name, alias, TimeUnit.MICROSECONDS);
+        return timer(name, alias, DEFAULT_TIMER_UNIT);
     }
 
     public SnapshottingTimer timer(MetricName name, TimeUnit durationUnit)
@@ -226,7 +227,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
         else if (metric instanceof Histogram)
             mbean = new JmxHistogram((Histogram) metric, name);
         else if (metric instanceof Timer)
-            mbean = new JmxTimer((Timer) metric, name, TimeUnit.SECONDS, TimeUnit.MICROSECONDS);
+            mbean = new JmxTimer((Timer) metric, name, TimeUnit.SECONDS, DEFAULT_TIMER_UNIT);
         else if (metric instanceof Metered)
             mbean = new JmxMeter((Metered) metric, name, TimeUnit.SECONDS);
         else
diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
index 4948af610f..bef6d08737 100644
--- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.net.LatencyConsumer;
 import org.apache.cassandra.utils.StatusLogger;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.DEFAULT_TIMER_UNIT;
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
 /**
@@ -209,8 +210,8 @@ public class MessagingMetrics implements InboundMessageHandlers.GlobalMetricCall
                                       LOG_DROPPED_INTERVAL_IN_MS,
                                       droppedInternal,
                                       droppedCrossNode,
-                                      TimeUnit.NANOSECONDS.toMillis((long) droppedForVerb.metrics.internalDroppedLatency.getSnapshot().getMean()),
-                                      TimeUnit.NANOSECONDS.toMillis((long) droppedForVerb.metrics.crossNodeDroppedLatency.getSnapshot().getMean())));
+                                      DEFAULT_TIMER_UNIT.toMillis((long) droppedForVerb.metrics.internalDroppedLatency.getSnapshot().getMean()),
+                                      DEFAULT_TIMER_UNIT.toMillis((long) droppedForVerb.metrics.crossNodeDroppedLatency.getSnapshot().getMean())));
                 ++count;
             }
         }
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 89ac036804..5e7ab78b29 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.metrics;
 
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
@@ -824,10 +825,10 @@ public class TableMetrics
         speculativeRetries = createTableCounter("SpeculativeRetries");
         speculativeFailedRetries = createTableCounter("SpeculativeFailedRetries");
         speculativeInsufficientReplicas = createTableCounter("SpeculativeInsufficientReplicas");
-        speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", () -> cfs.sampleReadLatencyNanos);
+        speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", () -> MICROSECONDS.toNanos(cfs.sampleReadLatencyMicros));
 
         additionalWrites = createTableCounter("AdditionalWrites");
-        additionalWriteLatencyNanos = createTableGauge("AdditionalWriteLatencyNanos", () -> cfs.additionalWriteLatencyNanos);
+        additionalWriteLatencyNanos = createTableGauge("AdditionalWriteLatencyNanos", () -> MICROSECONDS.toNanos(cfs.additionalWriteLatencyMicros));
 
         keyCacheHitRate = createTableGauge("KeyCacheHitRate", "KeyCacheHitRate", new RatioGauge()
         {
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 4d75f19bca..313f714820 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static java.lang.Long.MAX_VALUE;
 import static java.lang.Math.min;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
@@ -328,15 +329,15 @@ public abstract class AbstractWriteResponseHandler<T> implements RequestCallback
                                               .map(instance::getColumnFamilyStoreInstance)
                                               .collect(toList());
         for (ColumnFamilyStore cf : cfs)
-            timeout = min(timeout, cf.additionalWriteLatencyNanos);
+            timeout = min(timeout, cf.additionalWriteLatencyMicros);
 
         // no latency information, or we're overloaded
-        if (timeout > mutation.getTimeout(NANOSECONDS))
+        if (timeout > mutation.getTimeout(MICROSECONDS))
             return;
 
         try
         {
-            if (!condition.await(timeout, NANOSECONDS))
+            if (!condition.await(timeout, MICROSECONDS))
             {
                 for (ColumnFamilyStore cf : cfs)
                     cf.metric.additionalWrites.inc();
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index b5a759c3dc..1613283300 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -48,7 +48,7 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static com.google.common.collect.Iterables.all;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
 
 /**
  * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
@@ -220,10 +220,16 @@ public abstract class AbstractReadExecutor
     boolean shouldSpeculateAndMaybeWait()
     {
         // no latency information, or we're overloaded
-        if (cfs.sampleReadLatencyNanos > command.getTimeout(NANOSECONDS))
+        if (cfs.sampleReadLatencyMicros > command.getTimeout(MICROSECONDS))
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Decided not to speculate as {} > {}", cfs.sampleReadLatencyMicros, command.getTimeout(MICROSECONDS));
             return false;
+        }
 
-        return !handler.await(cfs.sampleReadLatencyNanos, NANOSECONDS);
+        if (logger.isTraceEnabled())
+            logger.trace("Awaiting {} microseconds before speculating", cfs.sampleReadLatencyMicros);
+        return !handler.await(cfs.sampleReadLatencyMicros, MICROSECONDS);
     }
 
     ReplicaPlan.ForTokenRead replicaPlan()
diff --git a/src/java/org/apache/cassandra/service/reads/FixedSpeculativeRetryPolicy.java b/src/java/org/apache/cassandra/service/reads/FixedSpeculativeRetryPolicy.java
index ccccf5523e..ce81169211 100644
--- a/src/java/org/apache/cassandra/service/reads/FixedSpeculativeRetryPolicy.java
+++ b/src/java/org/apache/cassandra/service/reads/FixedSpeculativeRetryPolicy.java
@@ -42,7 +42,7 @@ public class FixedSpeculativeRetryPolicy implements SpeculativeRetryPolicy
     @Override
     public long calculateThreshold(SnapshottingTimer latency, long existingValue)
     {
-        return TimeUnit.MILLISECONDS.toNanos(speculateAtMilliseconds);
+        return TimeUnit.MILLISECONDS.toMicros(speculateAtMilliseconds);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/reads/PercentileSpeculativeRetryPolicy.java b/src/java/org/apache/cassandra/service/reads/PercentileSpeculativeRetryPolicy.java
index 90a7edb3e9..a084a0fa33 100644
--- a/src/java/org/apache/cassandra/service/reads/PercentileSpeculativeRetryPolicy.java
+++ b/src/java/org/apache/cassandra/service/reads/PercentileSpeculativeRetryPolicy.java
@@ -57,6 +57,7 @@ public class PercentileSpeculativeRetryPolicy implements SpeculativeRetryPolicy
     {
         if (snapshot.size() <= 0)
             return existingValue;
+        // latency snapshot uses a default timer so is in microseconds, so just return percentile
         return (long) snapshot.getValue(percentile / 100);
     }
 
diff --git a/src/java/org/apache/cassandra/service/reads/SpeculativeRetryPolicy.java b/src/java/org/apache/cassandra/service/reads/SpeculativeRetryPolicy.java
index 31352fa8ee..1164076021 100644
--- a/src/java/org/apache/cassandra/service/reads/SpeculativeRetryPolicy.java
+++ b/src/java/org/apache/cassandra/service/reads/SpeculativeRetryPolicy.java
@@ -28,6 +28,13 @@ public interface SpeculativeRetryPolicy
         NEVER, FIXED, PERCENTILE, HYBRID, ALWAYS
     }
 
+    /**
+     * Calculate the delay in microseconds after which speculation takes place
+     *
+     * @param latency       snapshot of coordinator latencies (in microseconds)
+     * @param existingValue existing speculation threshold (in microseconds)
+     * @return speculation delay (in microseconds).
+     */
     long calculateThreshold(SnapshottingTimer latency, long existingValue);
 
     Kind kind();
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 04788c0762..28f94fdef7 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -47,7 +47,7 @@ import org.apache.cassandra.service.reads.DigestResolver;
 import org.apache.cassandra.service.reads.ReadCallback;
 import org.apache.cassandra.tracing.Tracing;
 
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
 
 public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E, P>>
         implements ReadRepair<E, P>
@@ -179,7 +179,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli
         ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
         return  consistency != ConsistencyLevel.EACH_QUORUM
                 && consistency.satisfies(speculativeCL, replicaPlan.get().replicationStrategy())
-                && cfs.sampleReadLatencyNanos <= command.getTimeout(NANOSECONDS);
+                && cfs.sampleReadLatencyMicros <= command.getTimeout(MICROSECONDS);
     }
 
     public void maybeSendAdditionalReads()
@@ -190,7 +190,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli
         if (repair == null)
             return;
 
-        if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyNanos, NANOSECONDS))
+        if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyMicros, MICROSECONDS))
         {
             Replica uncontacted = replicaPlan().firstUncontactedCandidate(replica -> true);
             if (uncontacted == null)
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 44092bd1d0..c7180058e8 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.tracing.Tracing;
 
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 /**
@@ -74,7 +75,7 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
     {
         for (BlockingPartitionRepair repair: repairs)
         {
-            repair.maybeSendAdditionalWrites(cfs.additionalWriteLatencyNanos, TimeUnit.NANOSECONDS);
+            repair.maybeSendAdditionalWrites(cfs.additionalWriteLatencyMicros, MICROSECONDS);
         }
     }
 
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 6d8bd0fe05..96437db615 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterators;
@@ -58,6 +59,7 @@ import org.apache.cassandra.metrics.ClearableHistogram;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.service.snapshot.SnapshotManifest;
 import org.apache.cassandra.service.snapshot.TableSnapshot;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -74,9 +76,11 @@ public class ColumnFamilyStoreTest
 {
     public static final String KEYSPACE1 = "ColumnFamilyStoreTest1";
     public static final String KEYSPACE2 = "ColumnFamilyStoreTest2";
+    public static final String KEYSPACE3 = "ColumnFamilyStoreTest3";
     public static final String CF_STANDARD1 = "Standard1";
     public static final String CF_STANDARD2 = "Standard2";
     public static final String CF_INDEX1 = "Indexed1";
+    public static final String CF_SPEC_RETRY1 = "SpeculativeRetryTest1";
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
@@ -90,6 +94,11 @@ public class ColumnFamilyStoreTest
         SchemaLoader.createKeyspace(KEYSPACE2,
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD1));
+        SchemaLoader.createKeyspace(KEYSPACE3,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE3, CF_SPEC_RETRY1)
+                                                .speculativeRetry(SpeculativeRetryPolicy.fromString("50PERCENTILE"))
+                                                .additionalWritePolicy(SpeculativeRetryPolicy.fromString("75PERCENTILE")));
     }
 
     @Before
@@ -323,6 +332,46 @@ public class ColumnFamilyStoreTest
         }
     }
 
+    @Test
+    public void speculationThreshold()
+    {
+        // CF_SPEC_RETRY1 configured to use the 50th percentile for read and 75th percentile for write
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE3).getColumnFamilyStore(CF_SPEC_RETRY1);
+
+        cfs.sampleReadLatencyMicros = 123000;
+        cfs.additionalWriteLatencyMicros = 234000;
+
+        // test updating before any stats are present
+        cfs.updateSpeculationThreshold();
+        assertThat(cfs.sampleReadLatencyMicros).isEqualTo(123000);
+        assertThat(cfs.additionalWriteLatencyMicros).isEqualTo(234000);
+
+        // Seed the column family with some latency data.
+        final int count = 10000;
+        for (int millis = 0; millis < count; millis++)
+        {
+            cfs.metric.coordinatorReadLatency.update(millis, TimeUnit.MILLISECONDS);
+            cfs.metric.coordinatorWriteLatency.update(millis, TimeUnit.MILLISECONDS);
+        }
+        // Sanity check the metrics - 50th percentile of linear 0-10000ms
+        // remember, latencies are only an estimate - off by up to 20% by the 1.2 factor between buckets.
+        assertThat(cfs.metric.coordinatorReadLatency.getCount()).isEqualTo(count);
+        assertThat(cfs.metric.coordinatorReadLatency.getSnapshot().getValue(0.5))
+            .isBetween((double) TimeUnit.MILLISECONDS.toMicros(5839),
+                       (double) TimeUnit.MILLISECONDS.toMicros(5840));
+        // Sanity check the metrics - 75th percentileof linear 0-10000ms
+        assertThat(cfs.metric.coordinatorWriteLatency.getCount()).isEqualTo(count);
+        assertThat(cfs.metric.coordinatorWriteLatency.getSnapshot().getValue(0.75))
+        .isBetween((double) TimeUnit.MILLISECONDS.toMicros(8409),
+                   (double) TimeUnit.MILLISECONDS.toMicros(8410));
+
+        // CF_SPEC_RETRY1 configured to use the 50th percentile for speculation
+        cfs.updateSpeculationThreshold();
+
+        assertThat(cfs.sampleReadLatencyMicros).isBetween(TimeUnit.MILLISECONDS.toMicros(5839), TimeUnit.MILLISECONDS.toMicros(5840));
+        assertThat(cfs.additionalWriteLatencyMicros).isBetween(TimeUnit.MILLISECONDS.toMicros(8409), TimeUnit.MILLISECONDS.toMicros(8410));
+    }
+
     // TODO: Fix once we have working supercolumns in 8099
 //    // CASSANDRA-3467.  the key here is that supercolumn and subcolumn comparators are different
 //    @Test
diff --git a/test/unit/org/apache/cassandra/service/OptionalTasksTest.java b/test/unit/org/apache/cassandra/service/OptionalTasksTest.java
index 0ccf72b57c..952abcdc33 100644
--- a/test/unit/org/apache/cassandra/service/OptionalTasksTest.java
+++ b/test/unit/org/apache/cassandra/service/OptionalTasksTest.java
@@ -64,12 +64,12 @@ public class OptionalTasksTest
 
         try
         {
-            long originalValue = cfs.sampleReadLatencyNanos;
+            long originalValue = cfs.sampleReadLatencyMicros;
 
             // ...and ensure that the speculation threshold updater doesn't run.
             SPECULATION_THRESHOLD_UPDATER.run();
 
-            assertEquals(originalValue, cfs.sampleReadLatencyNanos);
+            assertEquals(originalValue, cfs.sampleReadLatencyMicros);
         }
         finally
         {
@@ -86,11 +86,11 @@ public class OptionalTasksTest
         ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Objects.requireNonNull(metadata).id);
         Objects.requireNonNull(cfs).metric.coordinatorReadLatency.update(100, TimeUnit.NANOSECONDS);
 
-        long originalValue = cfs.sampleReadLatencyNanos;
+        long originalValue = cfs.sampleReadLatencyMicros;
 
         // ...and ensure that the speculation threshold updater runs.
         SPECULATION_THRESHOLD_UPDATER.run();
 
-        assertNotEquals(originalValue, cfs.sampleReadLatencyNanos);
+        assertNotEquals(originalValue, cfs.sampleReadLatencyMicros);
     }
 }
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 962801f161..6f6bf36cbf 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -75,7 +75,7 @@ public class ReadExecutorTest
                 full(InetAddressAndPort.getByName("127.0.0.254")),
                 full(InetAddressAndPort.getByName("127.0.0.253"))
         );
-        cfs.sampleReadLatencyNanos = 0;
+        cfs.sampleReadLatencyMicros = 0;
     }
 
     @Before
@@ -209,7 +209,7 @@ public class ReadExecutorTest
         executor.executeAsync();
 
         // ...and then force a speculative retry against another endpoint.
-        cfs.sampleReadLatencyNanos = 0L;
+        cfs.sampleReadLatencyMicros = 0L;
         executor.maybeTryAdditionalReplicas();
 
         new Thread(() ->
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 0fcc5788ab..bb10c67f1c 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -228,8 +228,8 @@ public abstract  class AbstractReadRepairTest
         ks = Keyspace.open(ksName);
         cfs = ks.getColumnFamilyStore("tbl");
 
-        cfs.sampleReadLatencyNanos = 0;
-        cfs.additionalWriteLatencyNanos = 0;
+        cfs.sampleReadLatencyMicros = 0;
+        cfs.additionalWriteLatencyMicros = 0;
 
         target1 = InetAddressAndPort.getByName("127.0.0.255");
         target2 = InetAddressAndPort.getByName("127.0.0.254");
@@ -274,8 +274,8 @@ public abstract  class AbstractReadRepairTest
     {
         assert configured : "configureClass must be called in a @BeforeClass method";
 
-        cfs.sampleReadLatencyNanos = 0;
-        cfs.additionalWriteLatencyNanos = 0;
+        cfs.sampleReadLatencyMicros = 0;
+        cfs.additionalWriteLatencyMicros = 0;
     }
 
     static ReplicaPlan.ForRangeRead replicaPlan(ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index 14c89b5348..8b5b2c1641 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -124,7 +124,7 @@ public class ReadRepairTest
         ks = Keyspace.open(ksName);
         cfs = ks.getColumnFamilyStore("tbl");
 
-        cfs.sampleReadLatencyNanos = 0;
+        cfs.sampleReadLatencyMicros = 0;
 
         target1 = full(InetAddressAndPort.getByName("127.0.0.255"));
         target2 = full(InetAddressAndPort.getByName("127.0.0.254"));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org