You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2019/12/16 23:46:22 UTC

[cassandra] branch trunk updated: Prevent read repair mutations from increasing read timeout

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9abe212  Prevent read repair mutations from increasing read timeout
9abe212 is described below

commit 9abe2127dde7ea317928b37b8b5c662e787b2192
Author: yifan-c <yc...@gmail.com>
AuthorDate: Thu Dec 5 15:00:19 2019 -0800

    Prevent read repair mutations from increasing read timeout
    
    Patch by Yifan Cai; Reviewed by Blake Eggleston and Jordan West for CASSANDRA-15442
---
 CHANGES.txt                                        |  1 +
 .../reads/repair/BlockingPartitionRepair.java      | 16 +++++++---
 .../service/reads/repair/BlockingReadRepair.java   |  2 +-
 .../test/DistributedReadWritePathTest.java         | 37 ++++++++++++++++++++++
 .../reads/repair/BlockingReadRepairTest.java       | 26 ++++++++-------
 .../repair/DiagEventsBlockingReadRepairTest.java   | 11 +++++--
 .../service/reads/repair/ReadRepairTest.java       | 23 ++++++++------
 7 files changed, 87 insertions(+), 29 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6783b2c..2c6a1d9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha3
+ * Prevent read repair mutations from increasing read timeout (CASSANDRA-15442)
  * Document 4.0 system keyspace changes, bump generations (CASSANDRA-15454)
  * Make it possible to disable STCS-in-L0 during runtime (CASSANDRA-15445)
  * Removed obsolete OldNetworkTopologyStrategy (CASSANDRA-13990)
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 220ada5..01fd7f0 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -161,11 +161,17 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
         }
     }
 
-    public boolean awaitRepairs(long timeout, TimeUnit timeoutUnit)
+    /**
+     * Wait for the repair to complete util a future time
+     * If the {@param timeoutAt} is a past time, the method returns immediately with the repair result.
+     * @param timeoutAt, future time
+     * @param timeUnit, the time unit of the future time
+     * @return true if repair is done; otherwise, false.
+     */
+    public boolean awaitRepairsUntil(long timeoutAt, TimeUnit timeUnit)
     {
-        long elapsed = System.nanoTime() - mutationsSentTime;
-        long remaining = timeoutUnit.toNanos(timeout) - elapsed;
-
+        long timeoutAtNanos = timeUnit.toNanos(timeoutAt);
+        long remaining = timeoutAtNanos - System.nanoTime();
         try
         {
             return latch.await(remaining, TimeUnit.NANOSECONDS);
@@ -190,7 +196,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
      */
     public void maybeSendAdditionalWrites(long timeout, TimeUnit timeoutUnit)
     {
-        if (awaitRepairs(timeout, timeoutUnit))
+        if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, TimeUnit.NANOSECONDS), timeoutUnit))
             return;
 
         E newCandidates = replicaPlan.uncontactedCandidates();
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index ef624d6..764765e 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -87,7 +87,7 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
         boolean timedOut = false;
         for (BlockingPartitionRepair repair: repairs)
         {
-            if (!repair.awaitRepairs(DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS), NANOSECONDS))
+            if (!repair.awaitRepairsUntil(DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS) + queryStartNanoTime, NANOSECONDS))
             {
                 timedOut = true;
             }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
index e0c6916..0870ab3 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.util.concurrent.TimeUnit;
+
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -29,6 +31,7 @@ import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.impl.IInvokableInstance;
 
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.net.Verb.READ_REPAIR_RSP;
 import static org.junit.Assert.assertEquals;
 
 import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
@@ -127,6 +130,40 @@ public class DistributedReadWritePathTest extends DistributedTestBase
     }
 
     @Test
+    public void readRepairTimeoutTest() throws Throwable
+    {
+        final long reducedReadTimeout = 3000L;
+        try (Cluster cluster = init(Cluster.create(3)))
+        {
+            cluster.forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setReadRpcTimeout(reducedReadTimeout)));
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+            cluster.verbs(READ_REPAIR_RSP).to(1).drop();
+            final long start = System.currentTimeMillis();
+            try
+            {
+                cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL);
+                Assert.fail("Read timeout expected but it did not occur");
+            }
+            catch (Exception ex)
+            {
+                // the containing exception class was loaded by another class loader. Comparing the message as a workaround to assert the exception
+                Assert.assertTrue(ex.getMessage().contains("org.apache.cassandra.exceptions.ReadTimeoutException"));
+                long actualTimeTaken = System.currentTimeMillis() - start;
+                long magicDelayAmount = 100L; // it might not be the best way to check if the time taken is around the timeout value.
+                // Due to the delays, the actual time taken from client perspective is slighly more than the timeout value
+                Assert.assertTrue(actualTimeTaken > reducedReadTimeout);
+                // But it should not exceed too much
+                Assert.assertTrue(actualTimeTaken < reducedReadTimeout + magicDelayAmount);
+                assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                           row(1, 1, 1)); // the partition happened when the repaired node sending back ack. The mutation should be in fact applied.
+            }
+        }
+    }
+
+    @Test
     public void failingReadRepairTest() throws Throwable
     {
         try (Cluster cluster = init(Cluster.create(3)))
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index 7538832..3cc1a63 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -25,7 +25,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
-import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -38,6 +37,7 @@ import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.reads.ReadCallback;
@@ -158,11 +158,11 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         assertMutationEqual(resolved, handler.mutationsSent.get(target3));
 
         // check repairs stop blocking after receiving 2 acks
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
         handler.ack(target1);
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
         handler.ack(target3);
-        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertTrue(getCurrentRepairStatus(handler));
 
     }
 
@@ -243,14 +243,13 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
         handler.sendInitialRepairs();
 
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
         handler.ack(target1);
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
 
         // here we should stop blocking, even though we've sent 3 repairs
         handler.ack(target2);
-        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
-
+        Assert.assertTrue(getCurrentRepairStatus(handler));
     }
 
     /**
@@ -275,14 +274,19 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint()));
 
         Assert.assertEquals(1, handler.waitingOn());
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
 
         handler.ack(remote1.endpoint());
         Assert.assertEquals(1, handler.waitingOn());
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
 
         handler.ack(replica1.endpoint());
         Assert.assertEquals(0, handler.waitingOn());
-        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertTrue(getCurrentRepairStatus(handler));
+    }
+
+    private boolean getCurrentRepairStatus(BlockingPartitionRepair handler)
+    {
+        return handler.awaitRepairsUntil(System.nanoTime(), TimeUnit.NANOSECONDS);
     }
 }
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index 3bcd757..c15d7f4 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -100,11 +100,16 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
         Assert.assertEquals(resolved.toString(), handler.updatesByEp.get(target3));
 
         // check repairs stop blocking after receiving 2 acks
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
         handler.ack(target1);
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
         handler.ack(target3);
-        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertTrue(getCurrentRepairStatus(handler));
+    }
+
+    private boolean getCurrentRepairStatus(BlockingPartitionRepair handler)
+    {
+        return handler.awaitRepairsUntil(System.nanoTime(), TimeUnit.NANOSECONDS);
     }
 
     public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime)
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index 232644d..5ae9dd8 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -216,11 +216,11 @@ public class ReadRepairTest
         assertMutationEqual(resolved, handler.mutationsSent.get(target3.endpoint()));
 
         // check repairs stop blocking after receiving 2 acks
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
         handler.ack(target1.endpoint());
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
         handler.ack(target3.endpoint());
-        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertTrue(getCurrentRepairStatus(handler));
     }
 
     /**
@@ -304,13 +304,13 @@ public class ReadRepairTest
         InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, replicas);
         handler.sendInitialRepairs();
 
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
         handler.ack(target1.endpoint());
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
 
         // here we should stop blocking, even though we've sent 3 repairs
         handler.ack(target2.endpoint());
-        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertTrue(getCurrentRepairStatus(handler));
 
     }
 
@@ -337,14 +337,19 @@ public class ReadRepairTest
         Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint()));
 
         Assert.assertEquals(1, handler.waitingOn());
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
 
         handler.ack(remote1.endpoint());
         Assert.assertEquals(1, handler.waitingOn());
-        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertFalse(getCurrentRepairStatus(handler));
 
         handler.ack(target1.endpoint());
         Assert.assertEquals(0, handler.waitingOn());
-        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        Assert.assertTrue(getCurrentRepairStatus(handler));
+    }
+
+    private boolean getCurrentRepairStatus(BlockingPartitionRepair handler)
+    {
+        return handler.awaitRepairsUntil(System.nanoTime(), TimeUnit.NANOSECONDS);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org