You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/08/31 21:40:48 UTC

[cassandra] branch cassandra-3.0 updated: Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new f9d41ff  Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible
f9d41ff is described below

commit f9d41ff83655ead37ac6083d7ee43f2c35a346da
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Tue Aug 24 14:42:24 2021 -0500

    Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible
    
    patch by Caleb Rackliffe; reviewed by Jon Meredith for CASSANDRA-16883
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/service/DigestResolver.java   |   8 +-
 .../org/apache/cassandra/service/ReadCallback.java |  13 +-
 .../service/reads/DigestResolverTest.java          | 164 +++++++++++++++++++++
 4 files changed, 180 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9089836..434c1d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.26:
+ * Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible (CASSANDRA-16883)
  * Fix secondary indexes on primary key columns skipping some writes (CASSANDRA-16868)
  * Fix incorrect error message in LegacyLayout (CASSANDRA-15136)
  * Use JMX to validate nodetool --jobs parameter (CASSANDRA-16104)
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 6a528e9..21db1ce 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -63,11 +63,15 @@ public class DigestResolver extends ResponseResolver
      */
     public PartitionIterator resolve() throws DigestMismatchException
     {
-        if (responses.size() == 1)
+        int responseCount = responses.size();
+        
+        assert responseCount > 0 : "Attempted response match comparison while no responses have been received.";
+        
+        if (responseCount == 1)
             return getData();
 
         if (logger.isTraceEnabled())
-            logger.trace("resolving {} responses", responses.size());
+            logger.trace("resolving {} responses", responseCount);
 
         compareResponses();
 
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 837ab5e..4dc7b8d 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -158,7 +158,14 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
         int n = waitingFor(message.from)
               ? recievedUpdater.incrementAndGet(this)
               : received;
-        if (n >= blockfor && resolver.isDataPresent())
+
+        /*
+         * Ensure that data is present and the response accumulator has properly published the
+         * responses it has received. This may result in not signaling immediately when we receive
+         * the minimum number of required results, but it guarantees at least the minimum will
+         * be accessible when we do signal. (see rdar://77320313)
+         */
+        if (n >= blockfor && resolver.responses.size() >= blockfor && resolver.isDataPresent())
         {
             condition.signalAll();
             // kick off a background digest comparison if this is a result that (may have) arrived after
@@ -178,9 +185,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
      */
     private boolean waitingFor(InetAddress from)
     {
-        return consistencyLevel.isDatacenterLocal()
-             ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
-             : true;
+        return !consistencyLevel.isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
     }
 
     /**
diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
new file mode 100644
index 0000000..d246a83
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.DigestMismatchException;
+import org.apache.cassandra.service.DigestResolver;
+import org.apache.cassandra.service.ReadCallback;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertTrue;
+
+public class DigestResolverTest
+{
+    public static final String KEYSPACE1 = "DigestResolverTest";
+    public static final String CF_STANDARD = "Standard1";
+
+    private static Keyspace ks;
+    private static CFMetaData cfm;
+
+    private static final InetAddressAndPort EP1;
+    private static final InetAddressAndPort EP2;
+
+    static
+    {
+        try
+        {
+            EP1 = InetAddressAndPort.getByName("127.0.0.1");
+            EP2 = InetAddressAndPort.getByName("127.0.0.2");
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @BeforeClass
+    public static void setupClass() throws Throwable
+    {
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
+        CFMetaData.Builder builder1 = CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD)
+                                                        .addPartitionKey("key", BytesType.instance)
+                                                        .addClusteringColumn("col1", AsciiType.instance)
+                                                        .addRegularColumn("c1", AsciiType.instance);
+
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(2), builder1.build());
+
+        ks = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = ks.getColumnFamilyStore(CF_STANDARD);
+        cfm = cfs.metadata;
+    }
+    
+    /**
+     * This test makes a time-boxed effort to reproduce the issue found in CASSANDRA-16883.
+     */
+    @Test
+    public void multiThreadedNoRepairNeededReadCallback() throws DigestMismatchException
+    {
+        DecoratedKey dk = Util.dk("key1");
+        SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, FBUtilities.nowInSeconds(), dk);
+        BufferCell cell = BufferCell.live(cfm, cfm.partitionColumns().regulars.getSimple(0), 1000, bytes("1"));
+        PartitionUpdate response = PartitionUpdate.singleRowUpdate(cfm, dk, BTreeRow.singleCellRow(cfm.comparator.make("1"), cell));
+
+        ExecutorService pool = Executors.newFixedThreadPool(2);
+        long endTime = System.nanoTime() + TimeUnit.MINUTES.toNanos(2);
+
+        try
+        {
+            while (System.nanoTime() < endTime)
+            {
+                final DigestResolver resolver = new DigestResolver(ks, command, ConsistencyLevel.ONE, 2);
+                final ReadCallback callback = new ReadCallback(resolver, ConsistencyLevel.ONE, command, ImmutableList.of(EP1.address, EP2.address));
+                
+                final CountDownLatch startlatch = new CountDownLatch(2);
+
+                pool.execute(() ->
+                             {
+                                 startlatch.countDown();
+                                 waitForLatch(startlatch);
+                                 callback.response(ReadResponse.createDataResponse(iter(response), command));
+                             });
+
+                pool.execute(() ->
+                             {
+                                 startlatch.countDown();
+                                 waitForLatch(startlatch);
+                                 callback.response(ReadResponse.createDataResponse(iter(response), command));
+                             });
+
+                callback.awaitResults();
+                assertTrue(resolver.isDataPresent());
+                
+                try (PartitionIterator result = resolver.resolve())
+                {
+                    assertTrue(result.hasNext());
+                }
+            }
+        }
+        finally
+        {
+            pool.shutdown();
+        }
+    }
+
+    public UnfilteredPartitionIterator iter(PartitionUpdate update)
+    {
+        return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator(), false);
+    }
+
+    private void waitForLatch(CountDownLatch startlatch)
+    {
+        try
+        {
+            startlatch.await();
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org