You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/12/08 19:03:56 UTC
[cassandra] branch trunk updated: DigestResolver.getData throws
AssertionError since dataResponse is null
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3436c3e DigestResolver.getData throws AssertionError since dataResponse is null
3436c3e is described below
commit 3436c3efc0ff785137ac299e8e09085ffa526f5c
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Dec 8 10:01:23 2020 -0800
DigestResolver.getData throws AssertionError since dataResponse is null
patch by Marcus Eriksson, Caleb Rackliffe; reviewed by Berenguer Blasi, Brandon Williams, Caleb Rackliffe, David Capwell for CASSANDRA-16097
---
CHANGES.txt | 1 +
.../service/reads/AbstractReadExecutor.java | 1 +
.../cassandra/service/reads/DigestResolver.java | 5 +-
.../cassandra/service/reads/ReadCallback.java | 10 ++-
.../distributed/test/ReadFailureTest.java | 100 +++++++++++++++++++++
.../cassandra/service/reads/ReadExecutorTest.java | 48 ++++++++++
6 files changed, 161 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 52d2526..0a5849f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@
* Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214)
* When a table attempts to clean up metrics, it was cleaning up all global table metrics (CASSANDRA-16095)
* Bring back the accepted encryption protocols list as configurable option (CASSANDRA-13325)
+ * DigestResolver.getData throws AssertionError since dataResponse is null (CASSANDRA-16097)
Merged from 3.11:
* SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071)
Merged from 3.0:
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 8907e74..ae09417 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -371,6 +371,7 @@ public abstract class AbstractReadExecutor
try
{
handler.awaitResults();
+ assert digestResolver.isDataPresent() : "awaitResults returned with no data present.";
}
catch (ReadTimeoutException e)
{
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index dbb761b..475c8c2 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -74,8 +74,6 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
public PartitionIterator getData()
{
- assert isDataPresent();
-
Collection<Message<ReadResponse>> responses = this.responses.snapshot();
if (!hasTransientResponse(responses))
@@ -109,7 +107,8 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
// validate digests against each other; return false immediately on mismatch.
ByteBuffer digest = null;
Collection<Message<ReadResponse>> snapshot = responses.snapshot();
- if (snapshot.size() <= 1)
+ assert snapshot.size() > 0 : "Attempted response match comparison while no responses have been received.";
+ if (snapshot.size() == 1)
return true;
// TODO: should also not calculate if only one full node
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 2968dbc..b7ee18c 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -99,7 +99,15 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
public void awaitResults() throws ReadFailureException, ReadTimeoutException
{
boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS);
- boolean failed = failures > 0 && blockFor + failures > replicaPlan().contacts().size();
+ /**
+ * Here we are checking isDataPresent in addition to the responses size because there is a possibility
+ * that an asynchronous speculative execution request could be returning after a local failure already
+ * signaled. Responses may have been set while the data reference is not yet.
+ * See {@link DigestResolver#preprocess(Message)}
+ * CASSANDRA-16097
+ */
+ boolean failed = failures > 0 &&
+ (blockFor > resolver.responses.size() || !resolver.isDataPresent());
if (signaled && !failed)
return;
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java
new file mode 100644
index 0000000..be8db6c
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+
+public class ReadFailureTest extends TestBaseImpl
+{
+ static final int TOMBSTONE_FAIL_THRESHOLD = 20;
+ static final int TOMBSTONE_FAIL_KEY = 100001;
+ static final String TABLE = "t";
+
+ /**
+ * This test attempts to create a race condition with speculative executions that would previously cause an AssertionError.
+ * N=2, RF=2, read ONE
+ * The read will fail on the local node due to tombstone read threshold. At the same time, a spec exec is triggered
+ * reading from the other node.
+ * <p>
+ * See CASSANDRA-16097 for further details.
+ */
+ @Test
+ public void testSpecExecRace() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.build().withNodes(2).withConfig(config -> config.set("tombstone_failure_threshold", TOMBSTONE_FAIL_THRESHOLD)).start()))
+ {
+ // Create a table with the spec exec policy set to a low percentile so it's more likely to produce a spec exec racing with the local request.
+ // Not using 'Always' because that actually uses a different class/mechanism and doesn't exercise the bug
+ // we're trying to produce.
+ cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int, c int, v int, PRIMARY KEY (k,c)) WITH speculative_retry = '5p';", KEYSPACE, TABLE));
+
+ // Create a partition with enough tombstones to create a read failure according to the configured threshold
+ for (int i = 0; i <= TOMBSTONE_FAIL_THRESHOLD; ++i)
+ cluster.coordinator(1).execute(String.format("DELETE FROM %s.t WHERE k=%d AND c=%d", KEYSPACE, TOMBSTONE_FAIL_KEY, i),
+ ConsistencyLevel.TWO);
+
+ // Create a bunch of latency samples for this failed operation.
+ loopFailStatement(cluster, 5000);
+ // Update the spec exec threshold based on the above samples.
+ // This would normally be done by the periodic task CassandraDaemon.SPECULATION_THRESHOLD_UPDATER.
+ cluster.get(1).runOnInstance(() ->
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE)
+ .getColumnFamilyStore(TABLE);
+ cfs.updateSpeculationThreshold();
+ });
+
+ // Run the request a bunch of times under racy conditions.
+ loopFailStatement(cluster, 5000);
+ }
+ }
+
+ private void loopFailStatement(ICluster cluster, int iterations)
+ {
+ final String query = String.format("SELECT k FROM %s.t WHERE k=%d", KEYSPACE, TOMBSTONE_FAIL_KEY);
+ for (int i = 0; i < iterations; ++i)
+ {
+ try
+ {
+ cluster.coordinator(1).execute(query, ConsistencyLevel.ONE);
+ fail("Request did not throw a ReadFailureException as expected.");
+ }
+ catch (Throwable t) // Throwable because the raised ReadFailure is loaded from a different classloader and doesn't match "ours"
+ {
+ String onFail = String.format("Did not receive expected ReadFailureException. Instead caught %s\n%s",
+ t, ExceptionUtils.getStackTrace(t));
+ assertNotNull(onFail, t.getMessage());
+ assertTrue(onFail, t.getMessage().contains(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.name()));
+ }
+ }
+ }
+}
+
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index e0a5927..3eb9b2e 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.service.reads;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.ReplicaPlan;
@@ -46,6 +48,8 @@ import org.apache.cassandra.schema.KeyspaceParams;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.cassandra.locator.ReplicaUtils.full;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ReadExecutorTest
@@ -187,6 +191,50 @@ public class ReadExecutorTest
assertEquals(1, ks.metric.speculativeFailedRetries.getCount());
}
+ /**
+ * Test that an async speculative execution racing with a local errored request does not violate assertions.
+ * CASSANDRA-16097
+ */
+ @Test
+ public void testRaceWithNonSpeculativeFailure()
+ {
+ MockSinglePartitionReadCommand command = new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365));
+ ReplicaPlan.ForTokenRead plan = plan(ConsistencyLevel.LOCAL_ONE, targets, targets.subList(0, 1));
+ AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(cfs, command, plan, System.nanoTime());
+
+ // Issue an initial request against the first endpoint...
+ executor.executeAsync();
+
+ // ...and then force a speculative retry against another endpoint.
+ cfs.sampleReadLatencyNanos = 0L;
+ executor.maybeTryAdditionalReplicas();
+
+ new Thread(() ->
+ {
+ // Fail the first request. When this fails the number of contacts has already been increased
+ // to 2, so the failure won't actally signal. However...
+ executor.handler.onFailure(targets.get(0).endpoint(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+
+ // ...speculative retries are fired after a short wait, and it is possible for the failure to
+ // reach the handler just before one is fired and the number of contacts incremented...
+ executor.handler.condition.signalAll();
+ }).start();
+
+ try
+ {
+ // ...but by the time we await for results, the number of contacts may already have been incremented.
+ // If we rely only on the number of failures and the number of nodes blocked for, compared to the number
+ // of contacts, we may not recognize that the query has failed.
+ executor.awaitResponses();
+ fail("Awaiting responses did not throw a ReadFailureException as expected.");
+ }
+ catch (Throwable t)
+ {
+ assertSame(ExceptionUtils.getStackTrace(t), ReadFailureException.class, t.getClass());
+ assertTrue(t.getMessage().contains(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.name()));
+ }
+ }
+
public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand
{
private final long timeout;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org