You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jo...@apache.org on 2022/09/06 16:09:10 UTC
[cassandra] branch cassandra-4.1 updated: Speculative execution threshold unit mismatch
This is an automated email from the ASF dual-hosted git repository.
jonmeredith pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new 6748b8b7ea Speculative execution threshold unit mismatch
6748b8b7ea is described below
commit 6748b8b7ead6fff7820045ebbfe23f6c050f2efe
Author: Jon Meredith <jo...@apache.org>
AuthorDate: Fri Sep 2 10:28:21 2022 -0600
Speculative execution threshold unit mismatch
patch by Jon Meredith; reviewed by Caleb Rackliffe, Yifan Cai for CASSANDRA-17877
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 13 +++---
.../metrics/CassandraMetricsRegistry.java | 7 ++--
.../apache/cassandra/metrics/MessagingMetrics.java | 5 ++-
.../org/apache/cassandra/metrics/TableMetrics.java | 5 ++-
.../service/AbstractWriteResponseHandler.java | 7 ++--
.../service/reads/AbstractReadExecutor.java | 12 ++++--
.../service/reads/FixedSpeculativeRetryPolicy.java | 2 +-
.../reads/PercentileSpeculativeRetryPolicy.java | 1 +
.../service/reads/SpeculativeRetryPolicy.java | 7 ++++
.../service/reads/repair/AbstractReadRepair.java | 6 +--
.../service/reads/repair/BlockingReadRepair.java | 3 +-
.../apache/cassandra/db/ColumnFamilyStoreTest.java | 49 ++++++++++++++++++++++
.../cassandra/service/OptionalTasksTest.java | 8 ++--
.../cassandra/service/reads/ReadExecutorTest.java | 4 +-
.../reads/repair/AbstractReadRepairTest.java | 8 ++--
.../service/reads/repair/ReadRepairTest.java | 2 +-
17 files changed, 104 insertions(+), 36 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index bf0ea108f8..6583cbfd1e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1-alpha2
+ * Speculative execution threshold unit mismatch (CASSANDRA-17877)
* Fix BulkLoader to load entireSSTableThrottle and entireSSTableInterDcThrottle (CASSANDRA-17677)
* Fix a race condition where a keyspace can be oopened while it is being removed (CASSANDRA-17658)
* DatabaseDescriptor will set the default failure detector during client initialization (CASSANDRA-17782)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a40e5c7ad1..2ba57683ca 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -174,7 +174,6 @@ import org.apache.cassandra.utils.concurrent.Refs;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static com.google.common.base.Throwables.propagate;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.config.DatabaseDescriptor.getFlushWriters;
import static org.apache.cassandra.db.commitlog.CommitLogPosition.NONE;
@@ -308,8 +307,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
private final Directories directories;
public final TableMetrics metric;
- public volatile long sampleReadLatencyNanos;
- public volatile long additionalWriteLatencyNanos;
+ public volatile long sampleReadLatencyMicros;
+ public volatile long additionalWriteLatencyMicros;
private final CassandraTableWriteHandler writeHandler;
private final CassandraStreamManager streamManager;
@@ -478,8 +477,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
crcCheckChance = new DefaultValue<>(metadata.get().params.crcCheckChance);
viewManager = keyspace.viewManager.forTable(metadata.id);
this.sstableIdGenerator = sstableIdGenerator;
- sampleReadLatencyNanos = DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS) / 2;
- additionalWriteLatencyNanos = DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS) / 2;
+ sampleReadLatencyMicros = DatabaseDescriptor.getReadRpcTimeout(TimeUnit.MICROSECONDS) / 2;
+ additionalWriteLatencyMicros = DatabaseDescriptor.getWriteRpcTimeout(TimeUnit.MICROSECONDS) / 2;
memtableFactory = metadata.get().params.memtable.factory();
logger.info("Initializing {}.{}", keyspace.getName(), name);
@@ -574,8 +573,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
{
try
{
- sampleReadLatencyNanos = metadata().params.speculativeRetry.calculateThreshold(metric.coordinatorReadLatency, sampleReadLatencyNanos);
- additionalWriteLatencyNanos = metadata().params.additionalWritePolicy.calculateThreshold(metric.coordinatorWriteLatency, additionalWriteLatencyNanos);
+ sampleReadLatencyMicros = metadata().params.speculativeRetry.calculateThreshold(metric.coordinatorReadLatency, sampleReadLatencyMicros);
+ additionalWriteLatencyMicros = metadata().params.additionalWritePolicy.calculateThreshold(metric.coordinatorWriteLatency, additionalWriteLatencyMicros);
}
catch (Throwable e)
{
diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 8ba0bda44e..37c37e357b 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -45,6 +45,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
private final Map<String, ThreadPoolMetrics> threadPoolMetrics = new ConcurrentHashMap<>();
private final MBeanWrapper mBeanServer = MBeanWrapper.instance;
+ public final static TimeUnit DEFAULT_TIMER_UNIT = TimeUnit.MICROSECONDS;
private CassandraMetricsRegistry()
{
@@ -98,12 +99,12 @@ public class CassandraMetricsRegistry extends MetricRegistry
public Timer timer(MetricName name)
{
- return timer(name, TimeUnit.MICROSECONDS);
+ return timer(name, DEFAULT_TIMER_UNIT);
}
public SnapshottingTimer timer(MetricName name, MetricName alias)
{
- return timer(name, alias, TimeUnit.MICROSECONDS);
+ return timer(name, alias, DEFAULT_TIMER_UNIT);
}
public SnapshottingTimer timer(MetricName name, TimeUnit durationUnit)
@@ -226,7 +227,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
else if (metric instanceof Histogram)
mbean = new JmxHistogram((Histogram) metric, name);
else if (metric instanceof Timer)
- mbean = new JmxTimer((Timer) metric, name, TimeUnit.SECONDS, TimeUnit.MICROSECONDS);
+ mbean = new JmxTimer((Timer) metric, name, TimeUnit.SECONDS, DEFAULT_TIMER_UNIT);
else if (metric instanceof Metered)
mbean = new JmxMeter((Metered) metric, name, TimeUnit.SECONDS);
else
diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
index 4948af610f..bef6d08737 100644
--- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.net.LatencyConsumer;
import org.apache.cassandra.utils.StatusLogger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.DEFAULT_TIMER_UNIT;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
/**
@@ -209,8 +210,8 @@ public class MessagingMetrics implements InboundMessageHandlers.GlobalMetricCall
LOG_DROPPED_INTERVAL_IN_MS,
droppedInternal,
droppedCrossNode,
- TimeUnit.NANOSECONDS.toMillis((long) droppedForVerb.metrics.internalDroppedLatency.getSnapshot().getMean()),
- TimeUnit.NANOSECONDS.toMillis((long) droppedForVerb.metrics.crossNodeDroppedLatency.getSnapshot().getMean())));
+ DEFAULT_TIMER_UNIT.toMillis((long) droppedForVerb.metrics.internalDroppedLatency.getSnapshot().getMean()),
+ DEFAULT_TIMER_UNIT.toMillis((long) droppedForVerb.metrics.crossNodeDroppedLatency.getSnapshot().getMean())));
++count;
}
}
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 89ac036804..5e7ab78b29 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.metrics;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
@@ -824,10 +825,10 @@ public class TableMetrics
speculativeRetries = createTableCounter("SpeculativeRetries");
speculativeFailedRetries = createTableCounter("SpeculativeFailedRetries");
speculativeInsufficientReplicas = createTableCounter("SpeculativeInsufficientReplicas");
- speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", () -> cfs.sampleReadLatencyNanos);
+ speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", () -> MICROSECONDS.toNanos(cfs.sampleReadLatencyMicros));
additionalWrites = createTableCounter("AdditionalWrites");
- additionalWriteLatencyNanos = createTableGauge("AdditionalWriteLatencyNanos", () -> cfs.additionalWriteLatencyNanos);
+ additionalWriteLatencyNanos = createTableGauge("AdditionalWriteLatencyNanos", () -> MICROSECONDS.toNanos(cfs.additionalWriteLatencyMicros));
keyCacheHitRate = createTableGauge("KeyCacheHitRate", "KeyCacheHitRate", new RatioGauge()
{
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 4d75f19bca..313f714820 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static java.lang.Long.MAX_VALUE;
import static java.lang.Math.min;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
@@ -328,15 +329,15 @@ public abstract class AbstractWriteResponseHandler<T> implements RequestCallback
.map(instance::getColumnFamilyStoreInstance)
.collect(toList());
for (ColumnFamilyStore cf : cfs)
- timeout = min(timeout, cf.additionalWriteLatencyNanos);
+ timeout = min(timeout, cf.additionalWriteLatencyMicros);
// no latency information, or we're overloaded
- if (timeout > mutation.getTimeout(NANOSECONDS))
+ if (timeout > mutation.getTimeout(MICROSECONDS))
return;
try
{
- if (!condition.await(timeout, NANOSECONDS))
+ if (!condition.await(timeout, MICROSECONDS))
{
for (ColumnFamilyStore cf : cfs)
cf.metric.additionalWrites.inc();
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index b5a759c3dc..1613283300 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -48,7 +48,7 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import static com.google.common.collect.Iterables.all;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
/**
* Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
@@ -220,10 +220,16 @@ public abstract class AbstractReadExecutor
boolean shouldSpeculateAndMaybeWait()
{
// no latency information, or we're overloaded
- if (cfs.sampleReadLatencyNanos > command.getTimeout(NANOSECONDS))
+ if (cfs.sampleReadLatencyMicros > command.getTimeout(MICROSECONDS))
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Decided not to speculate as {} > {}", cfs.sampleReadLatencyMicros, command.getTimeout(MICROSECONDS));
return false;
+ }
- return !handler.await(cfs.sampleReadLatencyNanos, NANOSECONDS);
+ if (logger.isTraceEnabled())
+ logger.trace("Awaiting {} microseconds before speculating", cfs.sampleReadLatencyMicros);
+ return !handler.await(cfs.sampleReadLatencyMicros, MICROSECONDS);
}
ReplicaPlan.ForTokenRead replicaPlan()
diff --git a/src/java/org/apache/cassandra/service/reads/FixedSpeculativeRetryPolicy.java b/src/java/org/apache/cassandra/service/reads/FixedSpeculativeRetryPolicy.java
index ccccf5523e..ce81169211 100644
--- a/src/java/org/apache/cassandra/service/reads/FixedSpeculativeRetryPolicy.java
+++ b/src/java/org/apache/cassandra/service/reads/FixedSpeculativeRetryPolicy.java
@@ -42,7 +42,7 @@ public class FixedSpeculativeRetryPolicy implements SpeculativeRetryPolicy
@Override
public long calculateThreshold(SnapshottingTimer latency, long existingValue)
{
- return TimeUnit.MILLISECONDS.toNanos(speculateAtMilliseconds);
+ return TimeUnit.MILLISECONDS.toMicros(speculateAtMilliseconds);
}
@Override
diff --git a/src/java/org/apache/cassandra/service/reads/PercentileSpeculativeRetryPolicy.java b/src/java/org/apache/cassandra/service/reads/PercentileSpeculativeRetryPolicy.java
index 90a7edb3e9..a084a0fa33 100644
--- a/src/java/org/apache/cassandra/service/reads/PercentileSpeculativeRetryPolicy.java
+++ b/src/java/org/apache/cassandra/service/reads/PercentileSpeculativeRetryPolicy.java
@@ -57,6 +57,7 @@ public class PercentileSpeculativeRetryPolicy implements SpeculativeRetryPolicy
{
if (snapshot.size() <= 0)
return existingValue;
+ // latency snapshot uses a default timer so is in microseconds, so just return percentile
return (long) snapshot.getValue(percentile / 100);
}
diff --git a/src/java/org/apache/cassandra/service/reads/SpeculativeRetryPolicy.java b/src/java/org/apache/cassandra/service/reads/SpeculativeRetryPolicy.java
index 31352fa8ee..1164076021 100644
--- a/src/java/org/apache/cassandra/service/reads/SpeculativeRetryPolicy.java
+++ b/src/java/org/apache/cassandra/service/reads/SpeculativeRetryPolicy.java
@@ -28,6 +28,13 @@ public interface SpeculativeRetryPolicy
NEVER, FIXED, PERCENTILE, HYBRID, ALWAYS
}
+ /**
+ * Calculate the delay in microseconds after which speculation takes place
+ *
+ * @param latency snapshot of coordinator latencies (in microseconds)
+ * @param existingValue existing speculation threshold (in microseconds)
+ * @return speculation delay (in microseconds).
+ */
long calculateThreshold(SnapshottingTimer latency, long existingValue);
Kind kind();
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 04788c0762..28f94fdef7 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -47,7 +47,7 @@ import org.apache.cassandra.service.reads.DigestResolver;
import org.apache.cassandra.service.reads.ReadCallback;
import org.apache.cassandra.tracing.Tracing;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E, P>>
implements ReadRepair<E, P>
@@ -179,7 +179,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli
ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
return consistency != ConsistencyLevel.EACH_QUORUM
&& consistency.satisfies(speculativeCL, replicaPlan.get().replicationStrategy())
- && cfs.sampleReadLatencyNanos <= command.getTimeout(NANOSECONDS);
+ && cfs.sampleReadLatencyMicros <= command.getTimeout(MICROSECONDS);
}
public void maybeSendAdditionalReads()
@@ -190,7 +190,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli
if (repair == null)
return;
- if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyNanos, NANOSECONDS))
+ if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyMicros, MICROSECONDS))
{
Replica uncontacted = replicaPlan().firstUncontactedCandidate(replica -> true);
if (uncontacted == null)
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 44092bd1d0..c7180058e8 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.tracing.Tracing;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
@@ -74,7 +75,7 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
{
for (BlockingPartitionRepair repair: repairs)
{
- repair.maybeSendAdditionalWrites(cfs.additionalWriteLatencyNanos, TimeUnit.NANOSECONDS);
+ repair.maybeSendAdditionalWrites(cfs.additionalWriteLatencyMicros, MICROSECONDS);
}
}
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 6d8bd0fe05..96437db615 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
@@ -58,6 +59,7 @@ import org.apache.cassandra.metrics.ClearableHistogram;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
import org.apache.cassandra.service.snapshot.SnapshotManifest;
import org.apache.cassandra.service.snapshot.TableSnapshot;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -74,9 +76,11 @@ public class ColumnFamilyStoreTest
{
public static final String KEYSPACE1 = "ColumnFamilyStoreTest1";
public static final String KEYSPACE2 = "ColumnFamilyStoreTest2";
+ public static final String KEYSPACE3 = "ColumnFamilyStoreTest3";
public static final String CF_STANDARD1 = "Standard1";
public static final String CF_STANDARD2 = "Standard2";
public static final String CF_INDEX1 = "Indexed1";
+ public static final String CF_SPEC_RETRY1 = "SpeculativeRetryTest1";
@BeforeClass
public static void defineSchema() throws ConfigurationException
@@ -90,6 +94,11 @@ public class ColumnFamilyStoreTest
SchemaLoader.createKeyspace(KEYSPACE2,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD1));
+ SchemaLoader.createKeyspace(KEYSPACE3,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE3, CF_SPEC_RETRY1)
+ .speculativeRetry(SpeculativeRetryPolicy.fromString("50PERCENTILE"))
+ .additionalWritePolicy(SpeculativeRetryPolicy.fromString("75PERCENTILE")));
}
@Before
@@ -323,6 +332,46 @@ public class ColumnFamilyStoreTest
}
}
+ @Test
+ public void speculationThreshold()
+ {
+ // CF_SPEC_RETRY1 configured to use the 50th percentile for read and 75th percentile for write
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE3).getColumnFamilyStore(CF_SPEC_RETRY1);
+
+ cfs.sampleReadLatencyMicros = 123000;
+ cfs.additionalWriteLatencyMicros = 234000;
+
+ // test updating before any stats are present
+ cfs.updateSpeculationThreshold();
+ assertThat(cfs.sampleReadLatencyMicros).isEqualTo(123000);
+ assertThat(cfs.additionalWriteLatencyMicros).isEqualTo(234000);
+
+ // Seed the column family with some latency data.
+ final int count = 10000;
+ for (int millis = 0; millis < count; millis++)
+ {
+ cfs.metric.coordinatorReadLatency.update(millis, TimeUnit.MILLISECONDS);
+ cfs.metric.coordinatorWriteLatency.update(millis, TimeUnit.MILLISECONDS);
+ }
+ // Sanity check the metrics - 50th percentile of linear 0-10000ms
+ // remember, latencies are only an estimate - off by up to 20% by the 1.2 factor between buckets.
+ assertThat(cfs.metric.coordinatorReadLatency.getCount()).isEqualTo(count);
+ assertThat(cfs.metric.coordinatorReadLatency.getSnapshot().getValue(0.5))
+ .isBetween((double) TimeUnit.MILLISECONDS.toMicros(5839),
+ (double) TimeUnit.MILLISECONDS.toMicros(5840));
+ // Sanity check the metrics - 75th percentileof linear 0-10000ms
+ assertThat(cfs.metric.coordinatorWriteLatency.getCount()).isEqualTo(count);
+ assertThat(cfs.metric.coordinatorWriteLatency.getSnapshot().getValue(0.75))
+ .isBetween((double) TimeUnit.MILLISECONDS.toMicros(8409),
+ (double) TimeUnit.MILLISECONDS.toMicros(8410));
+
+ // CF_SPEC_RETRY1 configured to use the 50th percentile for speculation
+ cfs.updateSpeculationThreshold();
+
+ assertThat(cfs.sampleReadLatencyMicros).isBetween(TimeUnit.MILLISECONDS.toMicros(5839), TimeUnit.MILLISECONDS.toMicros(5840));
+ assertThat(cfs.additionalWriteLatencyMicros).isBetween(TimeUnit.MILLISECONDS.toMicros(8409), TimeUnit.MILLISECONDS.toMicros(8410));
+ }
+
// TODO: Fix once we have working supercolumns in 8099
// // CASSANDRA-3467. the key here is that supercolumn and subcolumn comparators are different
// @Test
diff --git a/test/unit/org/apache/cassandra/service/OptionalTasksTest.java b/test/unit/org/apache/cassandra/service/OptionalTasksTest.java
index 0ccf72b57c..952abcdc33 100644
--- a/test/unit/org/apache/cassandra/service/OptionalTasksTest.java
+++ b/test/unit/org/apache/cassandra/service/OptionalTasksTest.java
@@ -64,12 +64,12 @@ public class OptionalTasksTest
try
{
- long originalValue = cfs.sampleReadLatencyNanos;
+ long originalValue = cfs.sampleReadLatencyMicros;
// ...and ensure that the speculation threshold updater doesn't run.
SPECULATION_THRESHOLD_UPDATER.run();
- assertEquals(originalValue, cfs.sampleReadLatencyNanos);
+ assertEquals(originalValue, cfs.sampleReadLatencyMicros);
}
finally
{
@@ -86,11 +86,11 @@ public class OptionalTasksTest
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Objects.requireNonNull(metadata).id);
Objects.requireNonNull(cfs).metric.coordinatorReadLatency.update(100, TimeUnit.NANOSECONDS);
- long originalValue = cfs.sampleReadLatencyNanos;
+ long originalValue = cfs.sampleReadLatencyMicros;
// ...and ensure that the speculation threshold updater runs.
SPECULATION_THRESHOLD_UPDATER.run();
- assertNotEquals(originalValue, cfs.sampleReadLatencyNanos);
+ assertNotEquals(originalValue, cfs.sampleReadLatencyMicros);
}
}
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 962801f161..6f6bf36cbf 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -75,7 +75,7 @@ public class ReadExecutorTest
full(InetAddressAndPort.getByName("127.0.0.254")),
full(InetAddressAndPort.getByName("127.0.0.253"))
);
- cfs.sampleReadLatencyNanos = 0;
+ cfs.sampleReadLatencyMicros = 0;
}
@Before
@@ -209,7 +209,7 @@ public class ReadExecutorTest
executor.executeAsync();
// ...and then force a speculative retry against another endpoint.
- cfs.sampleReadLatencyNanos = 0L;
+ cfs.sampleReadLatencyMicros = 0L;
executor.maybeTryAdditionalReplicas();
new Thread(() ->
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 0fcc5788ab..bb10c67f1c 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -228,8 +228,8 @@ public abstract class AbstractReadRepairTest
ks = Keyspace.open(ksName);
cfs = ks.getColumnFamilyStore("tbl");
- cfs.sampleReadLatencyNanos = 0;
- cfs.additionalWriteLatencyNanos = 0;
+ cfs.sampleReadLatencyMicros = 0;
+ cfs.additionalWriteLatencyMicros = 0;
target1 = InetAddressAndPort.getByName("127.0.0.255");
target2 = InetAddressAndPort.getByName("127.0.0.254");
@@ -274,8 +274,8 @@ public abstract class AbstractReadRepairTest
{
assert configured : "configureClass must be called in a @BeforeClass method";
- cfs.sampleReadLatencyNanos = 0;
- cfs.additionalWriteLatencyNanos = 0;
+ cfs.sampleReadLatencyMicros = 0;
+ cfs.additionalWriteLatencyMicros = 0;
}
static ReplicaPlan.ForRangeRead replicaPlan(ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index 14c89b5348..8b5b2c1641 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -124,7 +124,7 @@ public class ReadRepairTest
ks = Keyspace.open(ksName);
cfs = ks.getColumnFamilyStore("tbl");
- cfs.sampleReadLatencyNanos = 0;
+ cfs.sampleReadLatencyMicros = 0;
target1 = full(InetAddressAndPort.getByName("127.0.0.255"));
target2 = full(InetAddressAndPort.getByName("127.0.0.254"));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org