You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/08/31 21:40:48 UTC
[cassandra] branch cassandra-3.0 updated: Avoid signaling
DigestResolver until the minimum number of responses are guaranteed to be
visible
This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new f9d41ff Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible
f9d41ff is described below
commit f9d41ff83655ead37ac6083d7ee43f2c35a346da
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Tue Aug 24 14:42:24 2021 -0500
Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible
patch by Caleb Rackliffe; reviewed by Jon Meredith for CASSANDRA-16883
---
CHANGES.txt | 1 +
.../apache/cassandra/service/DigestResolver.java | 8 +-
.../org/apache/cassandra/service/ReadCallback.java | 13 +-
.../service/reads/DigestResolverTest.java | 164 +++++++++++++++++++++
4 files changed, 180 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 9089836..434c1d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.26:
+ * Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible (CASSANDRA-16883)
* Fix secondary indexes on primary key columns skipping some writes (CASSANDRA-16868)
* Fix incorrect error message in LegacyLayout (CASSANDRA-15136)
* Use JMX to validate nodetool --jobs parameter (CASSANDRA-16104)
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 6a528e9..21db1ce 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -63,11 +63,15 @@ public class DigestResolver extends ResponseResolver
*/
public PartitionIterator resolve() throws DigestMismatchException
{
- if (responses.size() == 1)
+ int responseCount = responses.size();
+
+ assert responseCount > 0 : "Attempted response match comparison while no responses have been received.";
+
+ if (responseCount == 1)
return getData();
if (logger.isTraceEnabled())
- logger.trace("resolving {} responses", responses.size());
+ logger.trace("resolving {} responses", responseCount);
compareResponses();
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 837ab5e..4dc7b8d 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -158,7 +158,14 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
int n = waitingFor(message.from)
? recievedUpdater.incrementAndGet(this)
: received;
- if (n >= blockfor && resolver.isDataPresent())
+
+ /*
+ * Ensure that data is present and the response accumulator has properly published the
+ * responses it has received. This may result in not signaling immediately when we receive
+ * the minimum number of required results, but it guarantees at least the minimum will
+ * be accessible when we do signal. (see rdar://77320313)
+ */
+ if (n >= blockfor && resolver.responses.size() >= blockfor && resolver.isDataPresent())
{
condition.signalAll();
// kick off a background digest comparison if this is a result that (may have) arrived after
@@ -178,9 +185,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
*/
private boolean waitingFor(InetAddress from)
{
- return consistencyLevel.isDatacenterLocal()
- ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
- : true;
+ return !consistencyLevel.isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
}
/**
diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
new file mode 100644
index 0000000..d246a83
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.DigestMismatchException;
+import org.apache.cassandra.service.DigestResolver;
+import org.apache.cassandra.service.ReadCallback;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertTrue;
+
+public class DigestResolverTest
+{
+ public static final String KEYSPACE1 = "DigestResolverTest";
+ public static final String CF_STANDARD = "Standard1";
+
+ private static Keyspace ks;
+ private static CFMetaData cfm;
+
+ private static final InetAddressAndPort EP1;
+ private static final InetAddressAndPort EP2;
+
+ static
+ {
+ try
+ {
+ EP1 = InetAddressAndPort.getByName("127.0.0.1");
+ EP2 = InetAddressAndPort.getByName("127.0.0.2");
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @BeforeClass
+ public static void setupClass() throws Throwable
+ {
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
+ CFMetaData.Builder builder1 = CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD)
+ .addPartitionKey("key", BytesType.instance)
+ .addClusteringColumn("col1", AsciiType.instance)
+ .addRegularColumn("c1", AsciiType.instance);
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(2), builder1.build());
+
+ ks = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(CF_STANDARD);
+ cfm = cfs.metadata;
+ }
+
+ /**
+ * This test makes a time-boxed effort to reproduce the issue found in CASSANDRA-16883.
+ */
+ @Test
+ public void multiThreadedNoRepairNeededReadCallback() throws DigestMismatchException
+ {
+ DecoratedKey dk = Util.dk("key1");
+ SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, FBUtilities.nowInSeconds(), dk);
+ BufferCell cell = BufferCell.live(cfm, cfm.partitionColumns().regulars.getSimple(0), 1000, bytes("1"));
+ PartitionUpdate response = PartitionUpdate.singleRowUpdate(cfm, dk, BTreeRow.singleCellRow(cfm.comparator.make("1"), cell));
+
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ long endTime = System.nanoTime() + TimeUnit.MINUTES.toNanos(2);
+
+ try
+ {
+ while (System.nanoTime() < endTime)
+ {
+ final DigestResolver resolver = new DigestResolver(ks, command, ConsistencyLevel.ONE, 2);
+ final ReadCallback callback = new ReadCallback(resolver, ConsistencyLevel.ONE, command, ImmutableList.of(EP1.address, EP2.address));
+
+ final CountDownLatch startlatch = new CountDownLatch(2);
+
+ pool.execute(() ->
+ {
+ startlatch.countDown();
+ waitForLatch(startlatch);
+ callback.response(ReadResponse.createDataResponse(iter(response), command));
+ });
+
+ pool.execute(() ->
+ {
+ startlatch.countDown();
+ waitForLatch(startlatch);
+ callback.response(ReadResponse.createDataResponse(iter(response), command));
+ });
+
+ callback.awaitResults();
+ assertTrue(resolver.isDataPresent());
+
+ try (PartitionIterator result = resolver.resolve())
+ {
+ assertTrue(result.hasNext());
+ }
+ }
+ }
+ finally
+ {
+ pool.shutdown();
+ }
+ }
+
+ public UnfilteredPartitionIterator iter(PartitionUpdate update)
+ {
+ return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator(), false);
+ }
+
+ private void waitForLatch(CountDownLatch startlatch)
+ {
+ try
+ {
+ startlatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org