You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/07/21 17:57:56 UTC

[cassandra] 01/02: Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 93496e826e7382adf52a99d4df38e73a43f892de
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Thu Jul 15 13:25:23 2021 -0500

    Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible
    
    patch by Caleb Rackliffe; reviewed by Andrés de la Peña and Benedict Elliott Smith for CASSANDRA-16807
---
 CHANGES.txt                                        |  1 +
 .../cassandra/service/reads/ReadCallback.java      | 54 +++++++++--------
 .../service/reads/DigestResolverTest.java          | 69 +++++++++++++++++++++-
 3 files changed, 95 insertions(+), 29 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b158970..51e58e1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.0
+ * Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible (CASSANDRA-16807)
  * Fix pre-4.0 FWD_FRM parameter serializer (CASSANDRA-16808)
  * Fix fwd to/from headers in DC write forwarding (CASSANDRA-16797)
  * Fix CassandraVersion::compareTo (CASSANDRA-16794)
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index b7ee18c..91d9370 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -47,7 +47,7 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
-    public final ResponseResolver resolver;
+    public final ResponseResolver<E, P> resolver;
     final SimpleCondition condition = new SimpleCondition();
     private final long queryStartNanoTime;
     final int blockFor; // TODO: move to replica plan as well?
@@ -55,15 +55,12 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
     // may not be visible to the threads immediately, but ReplicaPlan only contains final fields, so they will never see an uninitialised object
     final ReplicaPlan.Shared<E, P> replicaPlan;
     private final ReadCommand command;
-    private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
-            = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
-    private volatile int received = 0;
     private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
     private volatile int failures = 0;
     private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
 
-    public ReadCallback(ResponseResolver resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
+    public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
         this.command = command;
         this.resolver = resolver;
@@ -106,20 +103,20 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
          * See {@link DigestResolver#preprocess(Message)}
          * CASSANDRA-16097
          */
-        boolean failed = failures > 0 &&
-                         (blockFor > resolver.responses.size() || !resolver.isDataPresent());
+        int received = resolver.responses.size();
+        boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent());
         if (signaled && !failed)
             return;
 
         if (Tracing.isTracing())
         {
             String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
-            Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
+            Tracing.trace("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
         }
         else if (logger.isDebugEnabled())
         {
             String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
-            logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
+            logger.debug("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
         }
 
         // Same as for writes, see AbstractWriteResponseHandler
@@ -133,25 +130,22 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
         return blockFor;
     }
 
+    @Override
     public void onResponse(Message<ReadResponse> message)
     {
+        assertWaitingFor(message.from());
         resolver.preprocess(message);
-        int n = waitingFor(message.from())
-              ? recievedUpdater.incrementAndGet(this)
-              : received;
 
-        if (n >= blockFor && resolver.isDataPresent())
+        /*
+         * Ensure that data is present and the response accumulator has properly published the
+         * responses it has received. This may result in not signaling immediately when we receive
+         * the minimum number of required results, but it guarantees at least the minimum will
+         * be accessible when we do signal. (see CASSANDRA-16807)
+         */
+        if (resolver.isDataPresent() && resolver.responses.size() >= blockFor)
             condition.signalAll();
     }
 
-    /**
-     * @return true if the message counts towards the blockFor threshold
-     */
-    private boolean waitingFor(InetAddressAndPort from)
-    {
-        return !replicaPlan().consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
-    }
-
     public void response(ReadResponse result)
     {
         Verb kind = command.isRangeRequest() ? Verb.RANGE_RSP : Verb.READ_RSP;
@@ -169,13 +163,11 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
     @Override
     public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
     {
-        int n = waitingFor(from)
-              ? failuresUpdater.incrementAndGet(this)
-              : failures;
-
+        assertWaitingFor(from);
+                
         failureReasonByEndpoint.put(from, failureReason);
 
-        if (blockFor + n > replicaPlan().contacts().size())
+        if (blockFor + failuresUpdater.incrementAndGet(this) > replicaPlan().contacts().size())
             condition.signalAll();
     }
 
@@ -184,4 +176,14 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
     {
         return true;
     }
+
+    /**
+     * Verify that a message doesn't come from an unexpected replica.
+     */
+    private void assertWaitingFor(InetAddressAndPort from)
+    {
+        assert !replicaPlan().consistencyLevel().isDatacenterLocal()
+               || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
+               : "Received read response from unexpected replica: " + from;
+    }
 }
diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 4dee52a..36dd6d9 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -18,6 +18,11 @@
 
 package org.apache.cassandra.service.reads;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Test;
@@ -28,10 +33,7 @@ import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.locator.EndpointsForToken;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.reads.repair.NoopReadRepair;
-import org.apache.cassandra.service.reads.repair.TestableReadRepair;
 
 import static org.apache.cassandra.locator.ReplicaUtils.full;
 import static org.apache.cassandra.locator.ReplicaUtils.trans;
@@ -78,6 +80,55 @@ public class DigestResolverTest extends AbstractReadResponseTest
         assertPartitionsEqual(filter(iter(response)), resolver.getData());
     }
 
+    /**
+     * This test makes a time-boxed effort to reproduce the issue found in CASSANDRA-16807.
+     */
+    @Test
+    public void multiThreadedNoRepairNeededReadCallback()
+    {
+        SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
+        EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2));
+        PartitionUpdate response = update(row(1000, 4, 4), row(1000, 5, 5)).build();
+        ReplicaPlan.SharedForTokenRead plan = plan(ConsistencyLevel.ONE, targetReplicas);
+
+        ExecutorService pool = Executors.newFixedThreadPool(2);
+        long endTime = System.nanoTime() + TimeUnit.MINUTES.toNanos(2);
+
+        try
+        {
+            while (System.nanoTime() < endTime)
+            {
+                final long startNanos = System.nanoTime();
+                final DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> resolver = new DigestResolver<>(command, plan, startNanos);
+                final ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> callback = new ReadCallback<>(resolver, command, plan, startNanos);
+                
+                final CountDownLatch startlatch = new CountDownLatch(2);
+
+                pool.execute(() ->
+                             {
+                                 startlatch.countDown();
+                                 waitForLatch(startlatch);
+                                 callback.onResponse(response(command, EP1, iter(response), true));
+                             });
+
+                pool.execute(() ->
+                             {
+                                 startlatch.countDown();
+                                 waitForLatch(startlatch);
+                                 callback.onResponse(response(command, EP2, iter(response), true));
+                             });
+
+                callback.awaitResults();
+                Assert.assertTrue(resolver.isDataPresent());
+                Assert.assertTrue(resolver.responsesMatch());
+            }
+        }
+        finally
+        {
+            pool.shutdown();
+        }
+    }
+
     @Test
     public void digestMismatch()
     {
@@ -163,4 +214,16 @@ public class DigestResolverTest extends AbstractReadResponseTest
     {
         return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(), consistencyLevel, replicas, replicas));
     }
+
+    private void waitForLatch(CountDownLatch startlatch)
+    {
+        try
+        {
+            startlatch.await();
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+        }
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org