You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/12/08 19:03:56 UTC

[cassandra] branch trunk updated: DigestResolver.getData throws AssertionError since dataResponse is null

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3436c3e  DigestResolver.getData throws AssertionError since dataResponse is null
3436c3e is described below

commit 3436c3efc0ff785137ac299e8e09085ffa526f5c
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Dec 8 10:01:23 2020 -0800

    DigestResolver.getData throws AssertionError since dataResponse is null
    
    patch by Marcus Eriksson, Caleb Rackliffe; reviewed by Berenguer Blasi, Brandon Williams, Caleb Rackliffe, David Capwell for CASSANDRA-16097
---
 CHANGES.txt                                        |   1 +
 .../service/reads/AbstractReadExecutor.java        |   1 +
 .../cassandra/service/reads/DigestResolver.java    |   5 +-
 .../cassandra/service/reads/ReadCallback.java      |  10 ++-
 .../distributed/test/ReadFailureTest.java          | 100 +++++++++++++++++++++
 .../cassandra/service/reads/ReadExecutorTest.java  |  48 ++++++++++
 6 files changed, 161 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 52d2526..0a5849f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@
  * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214)
  * When a table attempts to clean up metrics, it was cleaning up all global table metrics (CASSANDRA-16095)
  * Bring back the accepted encryption protocols list as configurable option (CASSANDRA-13325)
+ * DigestResolver.getData throws AssertionError since dataResponse is null (CASSANDRA-16097)
 Merged from 3.11:
  * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071)
 Merged from 3.0:
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 8907e74..ae09417 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -371,6 +371,7 @@ public abstract class AbstractReadExecutor
         try
         {
             handler.awaitResults();
+            assert digestResolver.isDataPresent() : "awaitResults returned with no data present.";
         }
         catch (ReadTimeoutException e)
         {
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index dbb761b..475c8c2 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -74,8 +74,6 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
 
     public PartitionIterator getData()
     {
-        assert isDataPresent();
-
         Collection<Message<ReadResponse>> responses = this.responses.snapshot();
 
         if (!hasTransientResponse(responses))
@@ -109,7 +107,8 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
         // validate digests against each other; return false immediately on mismatch.
         ByteBuffer digest = null;
         Collection<Message<ReadResponse>> snapshot = responses.snapshot();
-        if (snapshot.size() <= 1)
+        assert snapshot.size() > 0 : "Attempted response match comparison while no responses have been received.";
+        if (snapshot.size() == 1)
             return true;
 
         // TODO: should also not calculate if only one full node
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 2968dbc..b7ee18c 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -99,7 +99,15 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
     public void awaitResults() throws ReadFailureException, ReadTimeoutException
     {
         boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS);
-        boolean failed = failures > 0 && blockFor + failures > replicaPlan().contacts().size();
+        /**
+         * Here we are checking isDataPresent in addition to the responses size because there is a possibility
+         * that an asynchronous speculative execution request could be returning after a local failure already
+         * signaled. Responses may have been set while the data reference is not yet.
+         * See {@link DigestResolver#preprocess(Message)}
+         * CASSANDRA-16097
+         */
+        boolean failed = failures > 0 &&
+                         (blockFor > resolver.responses.size() || !resolver.isDataPresent());
         if (signaled && !failed)
             return;
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java
new file mode 100644
index 0000000..be8db6c
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+
+public class ReadFailureTest extends TestBaseImpl
+{
+    static final int TOMBSTONE_FAIL_THRESHOLD = 20;
+    static final int TOMBSTONE_FAIL_KEY = 100001;
+    static final String TABLE = "t";
+
+    /**
+     * This test attempts to create a race condition with speculative executions that would previously cause an AssertionError.
+     * N=2, RF=2, read ONE
+     * The read will fail on the local node due to tombstone read threshold. At the same time, a spec exec is triggered
+     * reading from the other node.
+     * <p>
+     * See CASSANDRA-16097 for further details.
+     */
+    @Test
+    public void testSpecExecRace() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.build().withNodes(2).withConfig(config -> config.set("tombstone_failure_threshold", TOMBSTONE_FAIL_THRESHOLD)).start()))
+        {
+            // Create a table with the spec exec policy set to a low percentile so it's more likely to produce a spec exec racing with the local request.
+            // Not using 'Always' because that actually uses a different class/mechanism and doesn't exercise the bug
+            // we're trying to produce.
+            cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int, c int, v int, PRIMARY KEY (k,c)) WITH speculative_retry = '5p';", KEYSPACE, TABLE));
+
+            // Create a partition with enough tombstones to create a read failure according to the configured threshold
+            for (int i = 0; i <= TOMBSTONE_FAIL_THRESHOLD; ++i)
+                cluster.coordinator(1).execute(String.format("DELETE FROM %s.t WHERE k=%d AND c=%d", KEYSPACE, TOMBSTONE_FAIL_KEY, i),
+                                               ConsistencyLevel.TWO);
+
+            // Create a bunch of latency samples for this failed operation.
+            loopFailStatement(cluster, 5000);
+            // Update the spec exec threshold based on the above samples.
+            // This would normally be done by the periodic task CassandraDaemon.SPECULATION_THRESHOLD_UPDATER.
+            cluster.get(1).runOnInstance(() ->
+                                         {
+                                             ColumnFamilyStore cfs = Keyspace.open(KEYSPACE)
+                                                                             .getColumnFamilyStore(TABLE);
+                                             cfs.updateSpeculationThreshold();
+                                         });
+
+            // Run the request a bunch of times under racy conditions.
+            loopFailStatement(cluster, 5000);
+        }
+    }
+
+    private void loopFailStatement(ICluster cluster, int iterations)
+    {
+        final String query = String.format("SELECT k FROM %s.t WHERE k=%d", KEYSPACE, TOMBSTONE_FAIL_KEY);
+        for (int i = 0; i < iterations; ++i)
+        {
+            try
+            {
+                cluster.coordinator(1).execute(query, ConsistencyLevel.ONE);
+                fail("Request did not throw a ReadFailureException as expected.");
+            }
+            catch (Throwable t) // Throwable because the raised ReadFailure is loaded from a different classloader and doesn't match "ours"
+            {
+                String onFail = String.format("Did not receive expected ReadFailureException. Instead caught %s\n%s",
+                                              t, ExceptionUtils.getStackTrace(t));
+                assertNotNull(onFail, t.getMessage());
+                assertTrue(onFail, t.getMessage().contains(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.name()));
+            }
+        }
+    }
+}
+
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index e0a5927..3eb9b2e 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.service.reads;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.ReplicaPlan;
@@ -46,6 +48,8 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.cassandra.locator.ReplicaUtils.full;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class ReadExecutorTest
@@ -187,6 +191,50 @@ public class ReadExecutorTest
         assertEquals(1, ks.metric.speculativeFailedRetries.getCount());
     }
 
+    /**
+     * Test that an async speculative execution racing with a local errored request does not violate assertions.
+     * CASSANDRA-16097
+     */
+    @Test
+    public void testRaceWithNonSpeculativeFailure()
+    {
+        MockSinglePartitionReadCommand command = new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365));
+        ReplicaPlan.ForTokenRead plan = plan(ConsistencyLevel.LOCAL_ONE, targets, targets.subList(0, 1));
+        AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(cfs, command, plan, System.nanoTime());
+
+        // Issue an initial request against the first endpoint...
+        executor.executeAsync();
+
+        // ...and then force a speculative retry against another endpoint.
+        cfs.sampleReadLatencyNanos = 0L;
+        executor.maybeTryAdditionalReplicas();
+
+        new Thread(() ->
+                   {
+                       // Fail the first request. When this fails the number of contacts has already been increased
+                       // to 2, so the failure won't actally signal. However...
+                       executor.handler.onFailure(targets.get(0).endpoint(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+
+                       // ...speculative retries are fired after a short wait, and it is possible for the failure to
+                       // reach the handler just before one is fired and the number of contacts incremented...
+                       executor.handler.condition.signalAll();
+                   }).start();
+
+        try
+        {
+            // ...but by the time we await for results, the number of contacts may already have been incremented.
+            // If we rely only on the number of failures and the number of nodes blocked for, compared to the number
+            // of contacts, we may not recognize that the query has failed.
+            executor.awaitResponses();
+            fail("Awaiting responses did not throw a ReadFailureException as expected.");
+        }
+        catch (Throwable t)
+        {
+            assertSame(ExceptionUtils.getStackTrace(t), ReadFailureException.class, t.getClass());
+            assertTrue(t.getMessage().contains(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.name()));
+        }
+    }
+
     public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand
     {
         private final long timeout;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org