You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/07/21 17:57:56 UTC
[cassandra] 01/02: Avoid signaling DigestResolver until the minimum
number of responses are guaranteed to be visible
This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 93496e826e7382adf52a99d4df38e73a43f892de
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Thu Jul 15 13:25:23 2021 -0500
Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible
patch by Caleb Rackliffe; reviewed by Andrés de la Peña and Benedict Elliott Smith for CASSANDRA-16807
---
CHANGES.txt | 1 +
.../cassandra/service/reads/ReadCallback.java | 54 +++++++++--------
.../service/reads/DigestResolverTest.java | 69 +++++++++++++++++++++-
3 files changed, 95 insertions(+), 29 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index b158970..51e58e1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.0
+ * Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible (CASSANDRA-16807)
* Fix pre-4.0 FWD_FRM parameter serializer (CASSANDRA-16808)
* Fix fwd to/from headers in DC write forwarding (CASSANDRA-16797)
* Fix CassandraVersion::compareTo (CASSANDRA-16794)
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index b7ee18c..91d9370 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -47,7 +47,7 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
- public final ResponseResolver resolver;
+ public final ResponseResolver<E, P> resolver;
final SimpleCondition condition = new SimpleCondition();
private final long queryStartNanoTime;
final int blockFor; // TODO: move to replica plan as well?
@@ -55,15 +55,12 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
// may not be visible to the threads immediately, but ReplicaPlan only contains final fields, so they will never see an uninitialised object
final ReplicaPlan.Shared<E, P> replicaPlan;
private final ReadCommand command;
- private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
- = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
- private volatile int received = 0;
private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater
= AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
private volatile int failures = 0;
private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
- public ReadCallback(ResponseResolver resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
+ public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
this.command = command;
this.resolver = resolver;
@@ -106,20 +103,20 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
* See {@link DigestResolver#preprocess(Message)}
* CASSANDRA-16097
*/
- boolean failed = failures > 0 &&
- (blockFor > resolver.responses.size() || !resolver.isDataPresent());
+ int received = resolver.responses.size();
+ boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent());
if (signaled && !failed)
return;
if (Tracing.isTracing())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
- Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
+ Tracing.trace("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
}
else if (logger.isDebugEnabled())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
- logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
+ logger.debug("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
}
// Same as for writes, see AbstractWriteResponseHandler
@@ -133,25 +130,22 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
return blockFor;
}
+ @Override
public void onResponse(Message<ReadResponse> message)
{
+ assertWaitingFor(message.from());
resolver.preprocess(message);
- int n = waitingFor(message.from())
- ? recievedUpdater.incrementAndGet(this)
- : received;
- if (n >= blockFor && resolver.isDataPresent())
+ /*
+ * Ensure that data is present and the response accumulator has properly published the
+ * responses it has received. This may result in not signaling immediately when we receive
+ * the minimum number of required results, but it guarantees at least the minimum will
+ * be accessible when we do signal. (see CASSANDRA-16807)
+ */
+ if (resolver.isDataPresent() && resolver.responses.size() >= blockFor)
condition.signalAll();
}
- /**
- * @return true if the message counts towards the blockFor threshold
- */
- private boolean waitingFor(InetAddressAndPort from)
- {
- return !replicaPlan().consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
- }
-
public void response(ReadResponse result)
{
Verb kind = command.isRangeRequest() ? Verb.RANGE_RSP : Verb.READ_RSP;
@@ -169,13 +163,11 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
@Override
public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
{
- int n = waitingFor(from)
- ? failuresUpdater.incrementAndGet(this)
- : failures;
-
+ assertWaitingFor(from);
+
failureReasonByEndpoint.put(from, failureReason);
- if (blockFor + n > replicaPlan().contacts().size())
+ if (blockFor + failuresUpdater.incrementAndGet(this) > replicaPlan().contacts().size())
condition.signalAll();
}
@@ -184,4 +176,14 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
{
return true;
}
+
+ /**
+ * Verify that a message doesn't come from an unexpected replica.
+ */
+ private void assertWaitingFor(InetAddressAndPort from)
+ {
+ assert !replicaPlan().consistencyLevel().isDatacenterLocal()
+ || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
+ : "Received read response from unexpected replica: " + from;
+ }
}
diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 4dee52a..36dd6d9 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -18,6 +18,11 @@
package org.apache.cassandra.service.reads;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
import org.junit.Test;
@@ -28,10 +33,7 @@ import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.locator.EndpointsForToken;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.reads.repair.NoopReadRepair;
-import org.apache.cassandra.service.reads.repair.TestableReadRepair;
import static org.apache.cassandra.locator.ReplicaUtils.full;
import static org.apache.cassandra.locator.ReplicaUtils.trans;
@@ -78,6 +80,55 @@ public class DigestResolverTest extends AbstractReadResponseTest
assertPartitionsEqual(filter(iter(response)), resolver.getData());
}
+ /**
+ * This test makes a time-boxed effort to reproduce the issue found in CASSANDRA-16807.
+ */
+ @Test
+ public void multiThreadedNoRepairNeededReadCallback()
+ {
+ SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
+ EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2));
+ PartitionUpdate response = update(row(1000, 4, 4), row(1000, 5, 5)).build();
+ ReplicaPlan.SharedForTokenRead plan = plan(ConsistencyLevel.ONE, targetReplicas);
+
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ long endTime = System.nanoTime() + TimeUnit.MINUTES.toNanos(2);
+
+ try
+ {
+ while (System.nanoTime() < endTime)
+ {
+ final long startNanos = System.nanoTime();
+ final DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> resolver = new DigestResolver<>(command, plan, startNanos);
+ final ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> callback = new ReadCallback<>(resolver, command, plan, startNanos);
+
+ final CountDownLatch startlatch = new CountDownLatch(2);
+
+ pool.execute(() ->
+ {
+ startlatch.countDown();
+ waitForLatch(startlatch);
+ callback.onResponse(response(command, EP1, iter(response), true));
+ });
+
+ pool.execute(() ->
+ {
+ startlatch.countDown();
+ waitForLatch(startlatch);
+ callback.onResponse(response(command, EP2, iter(response), true));
+ });
+
+ callback.awaitResults();
+ Assert.assertTrue(resolver.isDataPresent());
+ Assert.assertTrue(resolver.responsesMatch());
+ }
+ }
+ finally
+ {
+ pool.shutdown();
+ }
+ }
+
@Test
public void digestMismatch()
{
@@ -163,4 +214,16 @@ public class DigestResolverTest extends AbstractReadResponseTest
{
return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(), consistencyLevel, replicas, replicas));
}
+
+ private void waitForLatch(CountDownLatch startlatch)
+ {
+ try
+ {
+ startlatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org