You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2019/12/16 23:46:22 UTC
[cassandra] branch trunk updated: Prevent read repair mutations
from increasing read timeout
This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9abe212 Prevent read repair mutations from increasing read timeout
9abe212 is described below
commit 9abe2127dde7ea317928b37b8b5c662e787b2192
Author: yifan-c <yc...@gmail.com>
AuthorDate: Thu Dec 5 15:00:19 2019 -0800
Prevent read repair mutations from increasing read timeout
Patch by Yifan Cai; Reviewed by Blake Eggleston and Jordan West for CASSANDRA-15442
---
CHANGES.txt | 1 +
.../reads/repair/BlockingPartitionRepair.java | 16 +++++++---
.../service/reads/repair/BlockingReadRepair.java | 2 +-
.../test/DistributedReadWritePathTest.java | 37 ++++++++++++++++++++++
.../reads/repair/BlockingReadRepairTest.java | 26 ++++++++-------
.../repair/DiagEventsBlockingReadRepairTest.java | 11 +++++--
.../service/reads/repair/ReadRepairTest.java | 23 ++++++++------
7 files changed, 87 insertions(+), 29 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6783b2c..2c6a1d9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha3
+ * Prevent read repair mutations from increasing read timeout (CASSANDRA-15442)
* Document 4.0 system keyspace changes, bump generations (CASSANDRA-15454)
* Make it possible to disable STCS-in-L0 during runtime (CASSANDRA-15445)
* Removed obsolete OldNetworkTopologyStrategy (CASSANDRA-13990)
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 220ada5..01fd7f0 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -161,11 +161,17 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
}
}
- public boolean awaitRepairs(long timeout, TimeUnit timeoutUnit)
+ /**
+ * Wait for the repair to complete util a future time
+ * If the {@param timeoutAt} is a past time, the method returns immediately with the repair result.
+ * @param timeoutAt, future time
+ * @param timeUnit, the time unit of the future time
+ * @return true if repair is done; otherwise, false.
+ */
+ public boolean awaitRepairsUntil(long timeoutAt, TimeUnit timeUnit)
{
- long elapsed = System.nanoTime() - mutationsSentTime;
- long remaining = timeoutUnit.toNanos(timeout) - elapsed;
-
+ long timeoutAtNanos = timeUnit.toNanos(timeoutAt);
+ long remaining = timeoutAtNanos - System.nanoTime();
try
{
return latch.await(remaining, TimeUnit.NANOSECONDS);
@@ -190,7 +196,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
*/
public void maybeSendAdditionalWrites(long timeout, TimeUnit timeoutUnit)
{
- if (awaitRepairs(timeout, timeoutUnit))
+ if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, TimeUnit.NANOSECONDS), timeoutUnit))
return;
E newCandidates = replicaPlan.uncontactedCandidates();
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index ef624d6..764765e 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -87,7 +87,7 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
boolean timedOut = false;
for (BlockingPartitionRepair repair: repairs)
{
- if (!repair.awaitRepairs(DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS), NANOSECONDS))
+ if (!repair.awaitRepairsUntil(DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS) + queryStartNanoTime, NANOSECONDS))
{
timedOut = true;
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
index e0c6916..0870ab3 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.distributed.test;
+import java.util.concurrent.TimeUnit;
+
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -29,6 +31,7 @@ import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.impl.IInvokableInstance;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.net.Verb.READ_REPAIR_RSP;
import static org.junit.Assert.assertEquals;
import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
@@ -127,6 +130,40 @@ public class DistributedReadWritePathTest extends DistributedTestBase
}
@Test
+ public void readRepairTimeoutTest() throws Throwable
+ {
+ final long reducedReadTimeout = 3000L;
+ try (Cluster cluster = init(Cluster.create(3)))
+ {
+ cluster.forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setReadRpcTimeout(reducedReadTimeout)));
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+ cluster.verbs(READ_REPAIR_RSP).to(1).drop();
+ final long start = System.currentTimeMillis();
+ try
+ {
+ cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL);
+ Assert.fail("Read timeout expected but it did not occur");
+ }
+ catch (Exception ex)
+ {
+ // the containing exception class was loaded by another class loader. Comparing the message as a workaround to assert the exception
+ Assert.assertTrue(ex.getMessage().contains("org.apache.cassandra.exceptions.ReadTimeoutException"));
+ long actualTimeTaken = System.currentTimeMillis() - start;
+ long magicDelayAmount = 100L; // it might not be the best way to check if the time taken is around the timeout value.
+ // Due to the delays, the actual time taken from client perspective is slighly more than the timeout value
+ Assert.assertTrue(actualTimeTaken > reducedReadTimeout);
+ // But it should not exceed too much
+ Assert.assertTrue(actualTimeTaken < reducedReadTimeout + magicDelayAmount);
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, 1)); // the partition happened when the repaired node sending back ack. The mutation should be in fact applied.
+ }
+ }
+ }
+
+ @Test
public void failingReadRepairTest() throws Throwable
{
try (Cluster cluster = init(Cluster.create(3)))
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index 7538832..3cc1a63 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -25,7 +25,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
-import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -38,6 +37,7 @@ import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.reads.ReadCallback;
@@ -158,11 +158,11 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
assertMutationEqual(resolved, handler.mutationsSent.get(target3));
// check repairs stop blocking after receiving 2 acks
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(target1);
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(target3);
- Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertTrue(getCurrentRepairStatus(handler));
}
@@ -243,14 +243,13 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
handler.sendInitialRepairs();
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(target1);
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
// here we should stop blocking, even though we've sent 3 repairs
handler.ack(target2);
- Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
-
+ Assert.assertTrue(getCurrentRepairStatus(handler));
}
/**
@@ -275,14 +274,19 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint()));
Assert.assertEquals(1, handler.waitingOn());
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(remote1.endpoint());
Assert.assertEquals(1, handler.waitingOn());
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(replica1.endpoint());
Assert.assertEquals(0, handler.waitingOn());
- Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertTrue(getCurrentRepairStatus(handler));
+ }
+
+ private boolean getCurrentRepairStatus(BlockingPartitionRepair handler)
+ {
+ return handler.awaitRepairsUntil(System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index 3bcd757..c15d7f4 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -100,11 +100,16 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
Assert.assertEquals(resolved.toString(), handler.updatesByEp.get(target3));
// check repairs stop blocking after receiving 2 acks
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(target1);
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(target3);
- Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertTrue(getCurrentRepairStatus(handler));
+ }
+
+ private boolean getCurrentRepairStatus(BlockingPartitionRepair handler)
+ {
+ return handler.awaitRepairsUntil(System.nanoTime(), TimeUnit.NANOSECONDS);
}
public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime)
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index 232644d..5ae9dd8 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -216,11 +216,11 @@ public class ReadRepairTest
assertMutationEqual(resolved, handler.mutationsSent.get(target3.endpoint()));
// check repairs stop blocking after receiving 2 acks
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(target1.endpoint());
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(target3.endpoint());
- Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertTrue(getCurrentRepairStatus(handler));
}
/**
@@ -304,13 +304,13 @@ public class ReadRepairTest
InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, replicas);
handler.sendInitialRepairs();
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(target1.endpoint());
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
// here we should stop blocking, even though we've sent 3 repairs
handler.ack(target2.endpoint());
- Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertTrue(getCurrentRepairStatus(handler));
}
@@ -337,14 +337,19 @@ public class ReadRepairTest
Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint()));
Assert.assertEquals(1, handler.waitingOn());
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(remote1.endpoint());
Assert.assertEquals(1, handler.waitingOn());
- Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertFalse(getCurrentRepairStatus(handler));
handler.ack(target1.endpoint());
Assert.assertEquals(0, handler.waitingOn());
- Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+ Assert.assertTrue(getCurrentRepairStatus(handler));
+ }
+
+ private boolean getCurrentRepairStatus(BlockingPartitionRepair handler)
+ {
+ return handler.awaitRepairsUntil(System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org