You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/07/21 17:57:55 UTC

[cassandra] branch cassandra-4.0 updated (cdf68e3 -> b91dcce)

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a change to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from cdf68e3  Merge branch 'cassandra-4.0.0' into cassandra-4.0
     new 93496e8  Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible
     new b91dcce  Merge branch 'cassandra-4.0.0' into cassandra-4.0

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  1 +
 .../cassandra/service/reads/ReadCallback.java      | 54 +++++++++--------
 .../service/reads/DigestResolverTest.java          | 69 +++++++++++++++++++++-
 3 files changed, 95 insertions(+), 29 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/02: Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 93496e826e7382adf52a99d4df38e73a43f892de
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Thu Jul 15 13:25:23 2021 -0500

    Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible
    
    patch by Caleb Rackliffe; reviewed by Andrés de la Peña and Benedict Elliott Smith for CASSANDRA-16807
---
 CHANGES.txt                                        |  1 +
 .../cassandra/service/reads/ReadCallback.java      | 54 +++++++++--------
 .../service/reads/DigestResolverTest.java          | 69 +++++++++++++++++++++-
 3 files changed, 95 insertions(+), 29 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b158970..51e58e1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.0
+ * Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible (CASSANDRA-16807)
  * Fix pre-4.0 FWD_FRM parameter serializer (CASSANDRA-16808)
  * Fix fwd to/from headers in DC write forwarding (CASSANDRA-16797)
  * Fix CassandraVersion::compareTo (CASSANDRA-16794)
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index b7ee18c..91d9370 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -47,7 +47,7 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
-    public final ResponseResolver resolver;
+    public final ResponseResolver<E, P> resolver;
     final SimpleCondition condition = new SimpleCondition();
     private final long queryStartNanoTime;
     final int blockFor; // TODO: move to replica plan as well?
@@ -55,15 +55,12 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
     // may not be visible to the threads immediately, but ReplicaPlan only contains final fields, so they will never see an uninitialised object
     final ReplicaPlan.Shared<E, P> replicaPlan;
     private final ReadCommand command;
-    private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
-            = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
-    private volatile int received = 0;
     private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
     private volatile int failures = 0;
     private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
 
-    public ReadCallback(ResponseResolver resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
+    public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
         this.command = command;
         this.resolver = resolver;
@@ -106,20 +103,20 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
          * See {@link DigestResolver#preprocess(Message)}
          * CASSANDRA-16097
          */
-        boolean failed = failures > 0 &&
-                         (blockFor > resolver.responses.size() || !resolver.isDataPresent());
+        int received = resolver.responses.size();
+        boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent());
         if (signaled && !failed)
             return;
 
         if (Tracing.isTracing())
         {
             String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
-            Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
+            Tracing.trace("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
         }
         else if (logger.isDebugEnabled())
         {
             String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
-            logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
+            logger.debug("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
         }
 
         // Same as for writes, see AbstractWriteResponseHandler
@@ -133,25 +130,22 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
         return blockFor;
     }
 
+    @Override
     public void onResponse(Message<ReadResponse> message)
     {
+        assertWaitingFor(message.from());
         resolver.preprocess(message);
-        int n = waitingFor(message.from())
-              ? recievedUpdater.incrementAndGet(this)
-              : received;
 
-        if (n >= blockFor && resolver.isDataPresent())
+        /*
+         * Ensure that data is present and the response accumulator has properly published the
+         * responses it has received. This may result in not signaling immediately when we receive
+         * the minimum number of required results, but it guarantees at least the minimum will
+         * be accessible when we do signal. (see CASSANDRA-16807)
+         */
+        if (resolver.isDataPresent() && resolver.responses.size() >= blockFor)
             condition.signalAll();
     }
 
-    /**
-     * @return true if the message counts towards the blockFor threshold
-     */
-    private boolean waitingFor(InetAddressAndPort from)
-    {
-        return !replicaPlan().consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
-    }
-
     public void response(ReadResponse result)
     {
         Verb kind = command.isRangeRequest() ? Verb.RANGE_RSP : Verb.READ_RSP;
@@ -169,13 +163,11 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
     @Override
     public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
     {
-        int n = waitingFor(from)
-              ? failuresUpdater.incrementAndGet(this)
-              : failures;
-
+        assertWaitingFor(from);
+                
         failureReasonByEndpoint.put(from, failureReason);
 
-        if (blockFor + n > replicaPlan().contacts().size())
+        if (blockFor + failuresUpdater.incrementAndGet(this) > replicaPlan().contacts().size())
             condition.signalAll();
     }
 
@@ -184,4 +176,14 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
     {
         return true;
     }
+
+    /**
+     * Verify that a message doesn't come from an unexpected replica.
+     */
+    private void assertWaitingFor(InetAddressAndPort from)
+    {
+        assert !replicaPlan().consistencyLevel().isDatacenterLocal()
+               || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
+               : "Received read response from unexpected replica: " + from;
+    }
 }
diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 4dee52a..36dd6d9 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -18,6 +18,11 @@
 
 package org.apache.cassandra.service.reads;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Test;
@@ -28,10 +33,7 @@ import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.locator.EndpointsForToken;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.reads.repair.NoopReadRepair;
-import org.apache.cassandra.service.reads.repair.TestableReadRepair;
 
 import static org.apache.cassandra.locator.ReplicaUtils.full;
 import static org.apache.cassandra.locator.ReplicaUtils.trans;
@@ -78,6 +80,55 @@ public class DigestResolverTest extends AbstractReadResponseTest
         assertPartitionsEqual(filter(iter(response)), resolver.getData());
     }
 
+    /**
+     * This test makes a time-boxed effort to reproduce the issue found in CASSANDRA-16807.
+     */
+    @Test
+    public void multiThreadedNoRepairNeededReadCallback()
+    {
+        SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
+        EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2));
+        PartitionUpdate response = update(row(1000, 4, 4), row(1000, 5, 5)).build();
+        ReplicaPlan.SharedForTokenRead plan = plan(ConsistencyLevel.ONE, targetReplicas);
+
+        ExecutorService pool = Executors.newFixedThreadPool(2);
+        long endTime = System.nanoTime() + TimeUnit.MINUTES.toNanos(2);
+
+        try
+        {
+            while (System.nanoTime() < endTime)
+            {
+                final long startNanos = System.nanoTime();
+                final DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> resolver = new DigestResolver<>(command, plan, startNanos);
+                final ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> callback = new ReadCallback<>(resolver, command, plan, startNanos);
+                
+                final CountDownLatch startlatch = new CountDownLatch(2);
+
+                pool.execute(() ->
+                             {
+                                 startlatch.countDown();
+                                 waitForLatch(startlatch);
+                                 callback.onResponse(response(command, EP1, iter(response), true));
+                             });
+
+                pool.execute(() ->
+                             {
+                                 startlatch.countDown();
+                                 waitForLatch(startlatch);
+                                 callback.onResponse(response(command, EP2, iter(response), true));
+                             });
+
+                callback.awaitResults();
+                Assert.assertTrue(resolver.isDataPresent());
+                Assert.assertTrue(resolver.responsesMatch());
+            }
+        }
+        finally
+        {
+            pool.shutdown();
+        }
+    }
+
     @Test
     public void digestMismatch()
     {
@@ -163,4 +214,16 @@ public class DigestResolverTest extends AbstractReadResponseTest
     {
         return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(), consistencyLevel, replicas, replicas));
     }
+
+    private void waitForLatch(CountDownLatch startlatch)
+    {
+        try
+        {
+            startlatch.await();
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+        }
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/02: Merge branch 'cassandra-4.0.0' into cassandra-4.0

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit b91dcce865aae57077c4584351da6fc744e50705
Merge: cdf68e3 93496e8
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Wed Jul 21 12:43:59 2021 -0500

    Merge branch 'cassandra-4.0.0' into cassandra-4.0

 CHANGES.txt                                        |  1 +
 .../cassandra/service/reads/ReadCallback.java      | 54 +++++++++--------
 .../service/reads/DigestResolverTest.java          | 69 +++++++++++++++++++++-
 3 files changed, 95 insertions(+), 29 deletions(-)

diff --cc CHANGES.txt
index b39b35d,51e58e1..4504b00
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
 +4.0.1
 + * Add repaired/unrepaired bytes back to nodetool (CASSANDRA-15282)
 + * Upgrade lz4-java to 1.8.0 to add RH6 support back (CASSANDRA-16753)
 + * Improve DiagnosticEventService.publish(event) logging message of events (CASSANDRA-16749)
 + * Cleanup dependency scopes (CASSANDRA-16704)
 + * Make JmxHistogram#getRecentValues() and JmxTimer#getRecentValues() thread-safe (CASSANDRA-16707)
 +Merged from 3.11:
 + * Make cqlsh use the same set of reserved keywords than the server uses (CASSANDRA-15663)
 + * Optimize bytes skipping when reading SSTable files (CASSANDRA-14415)
 + * Enable tombstone compactions when unchecked_tombstone_compaction is set in TWCS (CASSANDRA-14496)
 + * Read only the required SSTables for single partition queries (CASSANDRA-16737)
 +Merged from 3.0:
 + * Receipt of gossip shutdown notification updates TokenMetadata (CASSANDRA-16796)
 + * Count bloom filter misses correctly (CASSANDRA-12922)
 + * Reject token() in MV WHERE clause (CASSANDRA-13464)
 + * Ensure java executable is on the path (CASSANDRA-14325)
 + * Clean transaction log leftovers at the beginning of sstablelevelreset and sstableofflinerelevel (CASSANDRA-12519)
 +
  4.0.0
+  * Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible (CASSANDRA-16807)
   * Fix pre-4.0 FWD_FRM parameter serializer (CASSANDRA-16808)
   * Fix fwd to/from headers in DC write forwarding (CASSANDRA-16797)
   * Fix CassandraVersion::compareTo (CASSANDRA-16794)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org