You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2017/01/19 15:55:31 UTC
[3/6] cassandra git commit: Read repair is not blocking repair to
finish in foreground repair
Read repair is not blocking repair to finish in foreground repair
patch by Sylvain Lebresne; reviewed by Xiaolong Jiang and Jason Brown for CASSANDRA-13115
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48fed801
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48fed801
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48fed801
Branch: refs/heads/trunk
Commit: 48fed80162592f741bf29298e2064452d53de4d8
Parents: 3f41d7a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 12 10:03:11 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:49:14 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 1 +
.../cassandra/service/AsyncRepairCallback.java | 5 +-
.../apache/cassandra/service/DataResolver.java | 14 ++-
.../cassandra/service/DigestResolver.java | 9 +-
.../apache/cassandra/service/ReadCallback.java | 4 +-
.../cassandra/service/ResponseResolver.java | 12 ++
.../cassandra/service/DataResolverTest.java | 117 +++++++++++++------
8 files changed, 119 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97d49af..6293cfa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
* Stress daemon help is incorrect (CASSANDRA-12563)
* Remove ALTER TYPE support (CASSANDRA-12443)
* Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 41b1424..1abbb19 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -159,6 +159,7 @@ public abstract class UnfilteredPartitionIterators
public void close()
{
merged.close();
+ listener.close();
}
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index dec5319..d613f3d 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.service;
-import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.Stage;
@@ -46,9 +45,9 @@ public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
{
StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
{
- protected void runMayThrow() throws DigestMismatchException, IOException
+ protected void runMayThrow()
{
- repairResolver.resolve();
+ repairResolver.compareResponses();
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 4e5bfb8..01953e1 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -21,6 +21,8 @@ import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.TimeoutException;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
@@ -40,7 +42,8 @@ import org.apache.cassandra.utils.FBUtilities;
public class DataResolver extends ResponseResolver
{
- private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+ @VisibleForTesting
+ final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
{
@@ -73,6 +76,15 @@ public class DataResolver extends ResponseResolver
return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
}
+ public void compareResponses()
+ {
+ // We need to fully consume the results to trigger read repairs if appropriate
+ try (PartitionIterator iterator = resolve())
+ {
+ PartitionIterators.consume(iterator);
+ }
+ }
+
private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
{
// If we have only one results, there is no read repair to do and we can't get short reads
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 4a918a3..6a528e9 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -69,6 +69,13 @@ public class DigestResolver extends ResponseResolver
if (logger.isTraceEnabled())
logger.trace("resolving {} responses", responses.size());
+ compareResponses();
+
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+ }
+
+ public void compareResponses() throws DigestMismatchException
+ {
long start = System.nanoTime();
// validate digests against each other; throw immediately on mismatch.
@@ -87,8 +94,6 @@ public class DigestResolver extends ResponseResolver
if (logger.isTraceEnabled())
logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
}
public boolean isDataPresent()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8747004..516384a 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -219,10 +219,10 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
{
// If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch.
// Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
- // get a digest mismatch)
+ // get a digest mismatch).
try
{
- resolver.resolve();
+ resolver.compareResponses();
}
catch (DigestMismatchException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java
index e7c94a1..81b18b6 100644
--- a/src/java/org/apache/cassandra/service/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/ResponseResolver.java
@@ -47,6 +47,18 @@ public abstract class ResponseResolver
public abstract PartitionIterator getData();
public abstract PartitionIterator resolve() throws DigestMismatchException;
+ /**
+ * Compares received responses, potentially triggering a digest mismatch (for a digest resolver) and read-repairs
+ * (for a data resolver).
+ * <p>
+ * This is functionally equivalent to calling {@link #resolve()} and consuming the result, but can be slightly more
+ * efficient in some case due to the fact that we don't care about the result itself. This is used when doing
+ * asynchronous read-repairs.
+ *
+ * @throws DigestMismatchException if it's a digest resolver and the responses don't match.
+ */
+ public abstract void compareResponses() throws DigestMismatchException;
+
public abstract boolean isDataPresent();
public void preprocess(MessageIn<ReadResponse> message)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index c9878d4..fd1e54e 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -129,6 +129,21 @@ public class DataResolverTest
MessagingService.instance().clearMessageSinks();
}
+ /**
+ * Checks that the provided data resolver has the expected number of repair futures created.
+ * This method also "release" those future by faking replica responses to those repair, which is necessary or
+ * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures.
+ */
+ private void assertRepairFuture(DataResolver resolver, int expectedRepairs)
+ {
+ assertEquals(expectedRepairs, resolver.repairResults.size());
+
+ // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want
+ // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload).
+ for (AsyncOneResponse<?> future : resolver.repairResults)
+ future.response(MessageIn.create(null, null, null, null, -1));
+ }
+
@Test
public void testResolveNewerSingleRow() throws UnknownHostException
{
@@ -142,12 +157,15 @@ public class DataResolverTest
.add("c1", "v2")
.buildUpdate())));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c1");
- assertColumn(cfm, row, "c1", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c1");
+ assertColumn(cfm, row, "c1", "v2", 1);
+ }
+ assertRepairFuture(resolver, 1);
}
assertEquals(1, messageRecorder.sent.size());
@@ -172,13 +190,16 @@ public class DataResolverTest
.add("c2", "v2")
.buildUpdate())));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c1", "c2");
- assertColumn(cfm, row, "c1", "v1", 0);
- assertColumn(cfm, row, "c2", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c1", "c2");
+ assertColumn(cfm, row, "c1", "v1", 0);
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -224,6 +245,7 @@ public class DataResolverTest
assertFalse(rows.hasNext());
assertFalse(data.hasNext());
}
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -289,6 +311,7 @@ public class DataResolverTest
assertFalse(rows.hasNext());
}
+ assertRepairFuture(resolver, 4);
}
assertEquals(4, messageRecorder.sent.size());
@@ -330,12 +353,15 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false)));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c2");
- assertColumn(cfm, row, "c2", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c2");
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ assertRepairFuture(resolver, 1);
}
assertEquals(1, messageRecorder.sent.size());
@@ -356,6 +382,7 @@ public class DataResolverTest
try(PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 0);
}
assertTrue(messageRecorder.sent.isEmpty());
@@ -376,6 +403,7 @@ public class DataResolverTest
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 1);
}
// peer1 should get the deletion from peer2
@@ -407,12 +435,15 @@ public class DataResolverTest
InetAddress peer4 = peer();
resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "two");
- assertColumn(cfm, row, "two", "B", 3);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "two");
+ assertColumn(cfm, row, "two", "B", 3);
+ }
+ assertRepairFuture(resolver, 4);
}
// peer 1 needs to get the partition delete from peer 4 and the row from peer 3
@@ -498,6 +529,7 @@ public class DataResolverTest
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -575,12 +607,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
- Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+ Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+ }
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;
@@ -625,6 +661,7 @@ public class DataResolverTest
try(PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;
@@ -665,12 +702,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- ComplexColumnData cd = row.getComplexColumnData(m);
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+ assertRepairFuture(resolver, 1);
}
Assert.assertNull(messageRecorder.sent.get(peer1));
@@ -714,12 +755,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- ComplexColumnData cd = row.getComplexColumnData(m);
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;