You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2017/01/19 15:55:29 UTC
[1/6] cassandra git commit: Read repair is not blocking repair to
finish in foreground repair
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 3f41d7a76 -> 48fed8016
refs/heads/cassandra-3.11 affa68fd1 -> 74559de50
refs/heads/trunk 52df6a58d -> c3d724445
Read repair is not blocking repair to finish in foreground repair
patch by Sylvain Lebresne; reviewed by Xiaolong Jiang and Jason Brown for CASSANDRA-13115
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48fed801
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48fed801
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48fed801
Branch: refs/heads/cassandra-3.0
Commit: 48fed80162592f741bf29298e2064452d53de4d8
Parents: 3f41d7a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 12 10:03:11 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:49:14 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 1 +
.../cassandra/service/AsyncRepairCallback.java | 5 +-
.../apache/cassandra/service/DataResolver.java | 14 ++-
.../cassandra/service/DigestResolver.java | 9 +-
.../apache/cassandra/service/ReadCallback.java | 4 +-
.../cassandra/service/ResponseResolver.java | 12 ++
.../cassandra/service/DataResolverTest.java | 117 +++++++++++++------
8 files changed, 119 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97d49af..6293cfa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
* Stress daemon help is incorrect (CASSANDRA-12563)
* Remove ALTER TYPE support (CASSANDRA-12443)
* Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 41b1424..1abbb19 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -159,6 +159,7 @@ public abstract class UnfilteredPartitionIterators
public void close()
{
merged.close();
+ listener.close();
}
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index dec5319..d613f3d 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.service;
-import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.Stage;
@@ -46,9 +45,9 @@ public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
{
StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
{
- protected void runMayThrow() throws DigestMismatchException, IOException
+ protected void runMayThrow()
{
- repairResolver.resolve();
+ repairResolver.compareResponses();
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 4e5bfb8..01953e1 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -21,6 +21,8 @@ import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.TimeoutException;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
@@ -40,7 +42,8 @@ import org.apache.cassandra.utils.FBUtilities;
public class DataResolver extends ResponseResolver
{
- private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+ @VisibleForTesting
+ final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
{
@@ -73,6 +76,15 @@ public class DataResolver extends ResponseResolver
return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
}
+ public void compareResponses()
+ {
+ // We need to fully consume the results to trigger read repairs if appropriate
+ try (PartitionIterator iterator = resolve())
+ {
+ PartitionIterators.consume(iterator);
+ }
+ }
+
private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
{
// If we have only one results, there is no read repair to do and we can't get short reads
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 4a918a3..6a528e9 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -69,6 +69,13 @@ public class DigestResolver extends ResponseResolver
if (logger.isTraceEnabled())
logger.trace("resolving {} responses", responses.size());
+ compareResponses();
+
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+ }
+
+ public void compareResponses() throws DigestMismatchException
+ {
long start = System.nanoTime();
// validate digests against each other; throw immediately on mismatch.
@@ -87,8 +94,6 @@ public class DigestResolver extends ResponseResolver
if (logger.isTraceEnabled())
logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
}
public boolean isDataPresent()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8747004..516384a 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -219,10 +219,10 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
{
// If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch.
// Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
- // get a digest mismatch)
+ // get a digest mismatch).
try
{
- resolver.resolve();
+ resolver.compareResponses();
}
catch (DigestMismatchException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java
index e7c94a1..81b18b6 100644
--- a/src/java/org/apache/cassandra/service/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/ResponseResolver.java
@@ -47,6 +47,18 @@ public abstract class ResponseResolver
public abstract PartitionIterator getData();
public abstract PartitionIterator resolve() throws DigestMismatchException;
+ /**
+ * Compares received responses, potentially triggering a digest mismatch (for a digest resolver) and read-repairs
+ * (for a data resolver).
+ * <p>
+ * This is functionally equivalent to calling {@link #resolve()} and consuming the result, but can be slightly more
+ * efficient in some case due to the fact that we don't care about the result itself. This is used when doing
+ * asynchronous read-repairs.
+ *
+ * @throws DigestMismatchException if it's a digest resolver and the responses don't match.
+ */
+ public abstract void compareResponses() throws DigestMismatchException;
+
public abstract boolean isDataPresent();
public void preprocess(MessageIn<ReadResponse> message)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index c9878d4..fd1e54e 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -129,6 +129,21 @@ public class DataResolverTest
MessagingService.instance().clearMessageSinks();
}
+ /**
+ * Checks that the provided data resolver has the expected number of repair futures created.
+ * This method also "release" those future by faking replica responses to those repair, which is necessary or
+ * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures.
+ */
+ private void assertRepairFuture(DataResolver resolver, int expectedRepairs)
+ {
+ assertEquals(expectedRepairs, resolver.repairResults.size());
+
+ // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want
+ // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload).
+ for (AsyncOneResponse<?> future : resolver.repairResults)
+ future.response(MessageIn.create(null, null, null, null, -1));
+ }
+
@Test
public void testResolveNewerSingleRow() throws UnknownHostException
{
@@ -142,12 +157,15 @@ public class DataResolverTest
.add("c1", "v2")
.buildUpdate())));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c1");
- assertColumn(cfm, row, "c1", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c1");
+ assertColumn(cfm, row, "c1", "v2", 1);
+ }
+ assertRepairFuture(resolver, 1);
}
assertEquals(1, messageRecorder.sent.size());
@@ -172,13 +190,16 @@ public class DataResolverTest
.add("c2", "v2")
.buildUpdate())));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c1", "c2");
- assertColumn(cfm, row, "c1", "v1", 0);
- assertColumn(cfm, row, "c2", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c1", "c2");
+ assertColumn(cfm, row, "c1", "v1", 0);
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -224,6 +245,7 @@ public class DataResolverTest
assertFalse(rows.hasNext());
assertFalse(data.hasNext());
}
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -289,6 +311,7 @@ public class DataResolverTest
assertFalse(rows.hasNext());
}
+ assertRepairFuture(resolver, 4);
}
assertEquals(4, messageRecorder.sent.size());
@@ -330,12 +353,15 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false)));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c2");
- assertColumn(cfm, row, "c2", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c2");
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ assertRepairFuture(resolver, 1);
}
assertEquals(1, messageRecorder.sent.size());
@@ -356,6 +382,7 @@ public class DataResolverTest
try(PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 0);
}
assertTrue(messageRecorder.sent.isEmpty());
@@ -376,6 +403,7 @@ public class DataResolverTest
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 1);
}
// peer1 should get the deletion from peer2
@@ -407,12 +435,15 @@ public class DataResolverTest
InetAddress peer4 = peer();
resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "two");
- assertColumn(cfm, row, "two", "B", 3);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "two");
+ assertColumn(cfm, row, "two", "B", 3);
+ }
+ assertRepairFuture(resolver, 4);
}
// peer 1 needs to get the partition delete from peer 4 and the row from peer 3
@@ -498,6 +529,7 @@ public class DataResolverTest
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -575,12 +607,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
- Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+ Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+ }
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;
@@ -625,6 +661,7 @@ public class DataResolverTest
try(PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;
@@ -665,12 +702,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- ComplexColumnData cd = row.getComplexColumnData(m);
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+ assertRepairFuture(resolver, 1);
}
Assert.assertNull(messageRecorder.sent.get(peer1));
@@ -714,12 +755,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- ComplexColumnData cd = row.getComplexColumnData(m);
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;
[2/6] cassandra git commit: Read repair is not blocking repair to
finish in foreground repair
Posted by sl...@apache.org.
Read repair is not blocking repair to finish in foreground repair
patch by Sylvain Lebresne; reviewed by Xiaolong Jiang and Jason Brown for CASSANDRA-13115
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48fed801
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48fed801
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48fed801
Branch: refs/heads/cassandra-3.11
Commit: 48fed80162592f741bf29298e2064452d53de4d8
Parents: 3f41d7a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 12 10:03:11 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:49:14 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 1 +
.../cassandra/service/AsyncRepairCallback.java | 5 +-
.../apache/cassandra/service/DataResolver.java | 14 ++-
.../cassandra/service/DigestResolver.java | 9 +-
.../apache/cassandra/service/ReadCallback.java | 4 +-
.../cassandra/service/ResponseResolver.java | 12 ++
.../cassandra/service/DataResolverTest.java | 117 +++++++++++++------
8 files changed, 119 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97d49af..6293cfa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
* Stress daemon help is incorrect (CASSANDRA-12563)
* Remove ALTER TYPE support (CASSANDRA-12443)
* Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 41b1424..1abbb19 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -159,6 +159,7 @@ public abstract class UnfilteredPartitionIterators
public void close()
{
merged.close();
+ listener.close();
}
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index dec5319..d613f3d 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.service;
-import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.Stage;
@@ -46,9 +45,9 @@ public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
{
StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
{
- protected void runMayThrow() throws DigestMismatchException, IOException
+ protected void runMayThrow()
{
- repairResolver.resolve();
+ repairResolver.compareResponses();
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 4e5bfb8..01953e1 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -21,6 +21,8 @@ import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.TimeoutException;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
@@ -40,7 +42,8 @@ import org.apache.cassandra.utils.FBUtilities;
public class DataResolver extends ResponseResolver
{
- private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+ @VisibleForTesting
+ final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
{
@@ -73,6 +76,15 @@ public class DataResolver extends ResponseResolver
return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
}
+ public void compareResponses()
+ {
+ // We need to fully consume the results to trigger read repairs if appropriate
+ try (PartitionIterator iterator = resolve())
+ {
+ PartitionIterators.consume(iterator);
+ }
+ }
+
private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
{
// If we have only one results, there is no read repair to do and we can't get short reads
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 4a918a3..6a528e9 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -69,6 +69,13 @@ public class DigestResolver extends ResponseResolver
if (logger.isTraceEnabled())
logger.trace("resolving {} responses", responses.size());
+ compareResponses();
+
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+ }
+
+ public void compareResponses() throws DigestMismatchException
+ {
long start = System.nanoTime();
// validate digests against each other; throw immediately on mismatch.
@@ -87,8 +94,6 @@ public class DigestResolver extends ResponseResolver
if (logger.isTraceEnabled())
logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
}
public boolean isDataPresent()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8747004..516384a 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -219,10 +219,10 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
{
// If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch.
// Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
- // get a digest mismatch)
+ // get a digest mismatch).
try
{
- resolver.resolve();
+ resolver.compareResponses();
}
catch (DigestMismatchException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java
index e7c94a1..81b18b6 100644
--- a/src/java/org/apache/cassandra/service/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/ResponseResolver.java
@@ -47,6 +47,18 @@ public abstract class ResponseResolver
public abstract PartitionIterator getData();
public abstract PartitionIterator resolve() throws DigestMismatchException;
+ /**
+ * Compares received responses, potentially triggering a digest mismatch (for a digest resolver) and read-repairs
+ * (for a data resolver).
+ * <p>
+ * This is functionally equivalent to calling {@link #resolve()} and consuming the result, but can be slightly more
+ * efficient in some case due to the fact that we don't care about the result itself. This is used when doing
+ * asynchronous read-repairs.
+ *
+ * @throws DigestMismatchException if it's a digest resolver and the responses don't match.
+ */
+ public abstract void compareResponses() throws DigestMismatchException;
+
public abstract boolean isDataPresent();
public void preprocess(MessageIn<ReadResponse> message)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index c9878d4..fd1e54e 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -129,6 +129,21 @@ public class DataResolverTest
MessagingService.instance().clearMessageSinks();
}
+ /**
+ * Checks that the provided data resolver has the expected number of repair futures created.
+ * This method also "release" those future by faking replica responses to those repair, which is necessary or
+ * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures.
+ */
+ private void assertRepairFuture(DataResolver resolver, int expectedRepairs)
+ {
+ assertEquals(expectedRepairs, resolver.repairResults.size());
+
+ // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want
+ // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload).
+ for (AsyncOneResponse<?> future : resolver.repairResults)
+ future.response(MessageIn.create(null, null, null, null, -1));
+ }
+
@Test
public void testResolveNewerSingleRow() throws UnknownHostException
{
@@ -142,12 +157,15 @@ public class DataResolverTest
.add("c1", "v2")
.buildUpdate())));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c1");
- assertColumn(cfm, row, "c1", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c1");
+ assertColumn(cfm, row, "c1", "v2", 1);
+ }
+ assertRepairFuture(resolver, 1);
}
assertEquals(1, messageRecorder.sent.size());
@@ -172,13 +190,16 @@ public class DataResolverTest
.add("c2", "v2")
.buildUpdate())));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c1", "c2");
- assertColumn(cfm, row, "c1", "v1", 0);
- assertColumn(cfm, row, "c2", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c1", "c2");
+ assertColumn(cfm, row, "c1", "v1", 0);
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -224,6 +245,7 @@ public class DataResolverTest
assertFalse(rows.hasNext());
assertFalse(data.hasNext());
}
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -289,6 +311,7 @@ public class DataResolverTest
assertFalse(rows.hasNext());
}
+ assertRepairFuture(resolver, 4);
}
assertEquals(4, messageRecorder.sent.size());
@@ -330,12 +353,15 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false)));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c2");
- assertColumn(cfm, row, "c2", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c2");
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ assertRepairFuture(resolver, 1);
}
assertEquals(1, messageRecorder.sent.size());
@@ -356,6 +382,7 @@ public class DataResolverTest
try(PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 0);
}
assertTrue(messageRecorder.sent.isEmpty());
@@ -376,6 +403,7 @@ public class DataResolverTest
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 1);
}
// peer1 should get the deletion from peer2
@@ -407,12 +435,15 @@ public class DataResolverTest
InetAddress peer4 = peer();
resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "two");
- assertColumn(cfm, row, "two", "B", 3);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "two");
+ assertColumn(cfm, row, "two", "B", 3);
+ }
+ assertRepairFuture(resolver, 4);
}
// peer 1 needs to get the partition delete from peer 4 and the row from peer 3
@@ -498,6 +529,7 @@ public class DataResolverTest
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -575,12 +607,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
- Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+ Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+ }
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;
@@ -625,6 +661,7 @@ public class DataResolverTest
try(PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;
@@ -665,12 +702,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- ComplexColumnData cd = row.getComplexColumnData(m);
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+ assertRepairFuture(resolver, 1);
}
Assert.assertNull(messageRecorder.sent.get(peer1));
@@ -714,12 +755,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- ComplexColumnData cd = row.getComplexColumnData(m);
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;
[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by sl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
* cassandra-3.0:
Read repair is not blocking repair to finish in foreground repair
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/74559de5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/74559de5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/74559de5
Branch: refs/heads/trunk
Commit: 74559de50fa6974eba56a00ca37cfc7746134211
Parents: affa68f 48fed80
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 19 16:54:22 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:54:22 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 1 +
.../cassandra/service/AsyncRepairCallback.java | 2 +-
.../apache/cassandra/service/DataResolver.java | 14 ++-
.../cassandra/service/DigestResolver.java | 9 +-
.../apache/cassandra/service/ReadCallback.java | 4 +-
.../cassandra/service/ResponseResolver.java | 12 ++
.../cassandra/service/DataResolverTest.java | 117 +++++++++++++------
8 files changed, 118 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 853ca09,6293cfa..8b613e6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,120 -1,11 +1,121 @@@
-3.0.11
+3.10
+ * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
+ * Fixed query monitoring for range queries (CASSANDRA-13050)
+ * Remove outboundBindAny configuration property (CASSANDRA-12673)
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
- * Stress daemon help is incorrect (CASSANDRA-12563)
+ * Stress daemon help is incorrect(CASSANDRA-12563)
* Remove ALTER TYPE support (CASSANDRA-12443)
* Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
- * Set javac encoding to utf-8 (CASSANDRA-11077)
* Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
- * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
* Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
* Add parent repair session id to anticompaction log message (CASSANDRA-12186)
* Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index be8eca1,01953e1..1bbfecd
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -42,13 -42,12 +44,14 @@@ import org.apache.cassandra.utils.FBUti
public class DataResolver extends ResponseResolver
{
- private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+ @VisibleForTesting
+ final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+ private final long queryStartNanoTime;
- public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+ public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
{
super(keyspace, command, consistency, maxResponseCount);
+ this.queryStartNanoTime = queryStartNanoTime;
}
public PartitionIterator getData()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by sl...@apache.org.
Merge branch 'cassandra-3.11' into trunk
* cassandra-3.11:
Read repair is not blocking repair to finish in foreground repair
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3d72444
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3d72444
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3d72444
Branch: refs/heads/trunk
Commit: c3d72444512861c75ec11e17f52770d2613e5364
Parents: 52df6a5 74559de
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 19 16:55:11 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:55:11 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 1 +
.../cassandra/service/AsyncRepairCallback.java | 2 +-
.../apache/cassandra/service/DataResolver.java | 14 ++-
.../cassandra/service/DigestResolver.java | 9 +-
.../apache/cassandra/service/ReadCallback.java | 4 +-
.../cassandra/service/ResponseResolver.java | 12 ++
.../cassandra/service/DataResolverTest.java | 117 +++++++++++++------
8 files changed, 118 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3d72444/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8f68618,8b613e6..cc16a8f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -139,11 -109,13 +139,12 @@@
* Remove pre-startup check for open JMX port (CASSANDRA-12074)
* Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
* Restore resumable hints delivery (CASSANDRA-11960)
- * Properly report LWT contention (CASSANDRA-12626)
+ * Properly record CAS contention (CASSANDRA-12626)
Merged from 3.0:
+ * Stress daemon help is incorrect (CASSANDRA-12563)
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
- * Stress daemon help is incorrect(CASSANDRA-12563)
- * Remove ALTER TYPE support (CASSANDRA-12443)
- * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
* Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
+ * Remove support for non-JavaScript UDFs (CASSANDRA-12883)
* Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
* Add parent repair session id to anticompaction log message (CASSANDRA-12186)
* Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3d72444/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3d72444/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3d72444/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3d72444/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/DataResolverTest.java
index 3dbc166,7c38224..dba3e95
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@@ -330,14 -353,17 +353,17 @@@ public class DataResolverTes
.add("c2", "v2")
.buildUpdate())));
InetAddress peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false)));
+ resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm)));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c2");
- assertColumn(cfm, row, "c2", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c2");
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ assertRepairFuture(resolver, 1);
}
assertEquals(1, messageRecorder.sent.size());
[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by sl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
* cassandra-3.0:
Read repair is not blocking repair to finish in foreground repair
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/74559de5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/74559de5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/74559de5
Branch: refs/heads/cassandra-3.11
Commit: 74559de50fa6974eba56a00ca37cfc7746134211
Parents: affa68f 48fed80
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 19 16:54:22 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:54:22 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 1 +
.../cassandra/service/AsyncRepairCallback.java | 2 +-
.../apache/cassandra/service/DataResolver.java | 14 ++-
.../cassandra/service/DigestResolver.java | 9 +-
.../apache/cassandra/service/ReadCallback.java | 4 +-
.../cassandra/service/ResponseResolver.java | 12 ++
.../cassandra/service/DataResolverTest.java | 117 +++++++++++++------
8 files changed, 118 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 853ca09,6293cfa..8b613e6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,120 -1,11 +1,121 @@@
-3.0.11
+3.10
+ * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
+ * Fixed query monitoring for range queries (CASSANDRA-13050)
+ * Remove outboundBindAny configuration property (CASSANDRA-12673)
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
- * Stress daemon help is incorrect (CASSANDRA-12563)
+ * Stress daemon help is incorrect(CASSANDRA-12563)
* Remove ALTER TYPE support (CASSANDRA-12443)
* Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
- * Set javac encoding to utf-8 (CASSANDRA-11077)
* Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
- * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
* Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
* Add parent repair session id to anticompaction log message (CASSANDRA-12186)
* Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index be8eca1,01953e1..1bbfecd
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -42,13 -42,12 +44,14 @@@ import org.apache.cassandra.utils.FBUti
public class DataResolver extends ResponseResolver
{
- private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+ @VisibleForTesting
+ final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+ private final long queryStartNanoTime;
- public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+ public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
{
super(keyspace, command, consistency, maxResponseCount);
+ this.queryStartNanoTime = queryStartNanoTime;
}
public PartitionIterator getData()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
[3/6] cassandra git commit: Read repair is not blocking repair to
finish in foreground repair
Posted by sl...@apache.org.
Read repair is not blocking repair to finish in foreground repair
patch by Sylvain Lebresne; reviewed by Xiaolong Jiang and Jason Brown for CASSANDRA-13115
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48fed801
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48fed801
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48fed801
Branch: refs/heads/trunk
Commit: 48fed80162592f741bf29298e2064452d53de4d8
Parents: 3f41d7a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 12 10:03:11 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:49:14 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 1 +
.../cassandra/service/AsyncRepairCallback.java | 5 +-
.../apache/cassandra/service/DataResolver.java | 14 ++-
.../cassandra/service/DigestResolver.java | 9 +-
.../apache/cassandra/service/ReadCallback.java | 4 +-
.../cassandra/service/ResponseResolver.java | 12 ++
.../cassandra/service/DataResolverTest.java | 117 +++++++++++++------
8 files changed, 119 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97d49af..6293cfa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
* Stress daemon help is incorrect (CASSANDRA-12563)
* Remove ALTER TYPE support (CASSANDRA-12443)
* Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 41b1424..1abbb19 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -159,6 +159,7 @@ public abstract class UnfilteredPartitionIterators
public void close()
{
merged.close();
+ listener.close();
}
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index dec5319..d613f3d 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.service;
-import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.Stage;
@@ -46,9 +45,9 @@ public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
{
StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
{
- protected void runMayThrow() throws DigestMismatchException, IOException
+ protected void runMayThrow()
{
- repairResolver.resolve();
+ repairResolver.compareResponses();
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 4e5bfb8..01953e1 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -21,6 +21,8 @@ import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.TimeoutException;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
@@ -40,7 +42,8 @@ import org.apache.cassandra.utils.FBUtilities;
public class DataResolver extends ResponseResolver
{
- private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+ @VisibleForTesting
+ final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
{
@@ -73,6 +76,15 @@ public class DataResolver extends ResponseResolver
return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
}
+ public void compareResponses()
+ {
+ // We need to fully consume the results to trigger read repairs if appropriate
+ try (PartitionIterator iterator = resolve())
+ {
+ PartitionIterators.consume(iterator);
+ }
+ }
+
private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
{
// If we have only one results, there is no read repair to do and we can't get short reads
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 4a918a3..6a528e9 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -69,6 +69,13 @@ public class DigestResolver extends ResponseResolver
if (logger.isTraceEnabled())
logger.trace("resolving {} responses", responses.size());
+ compareResponses();
+
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+ }
+
+ public void compareResponses() throws DigestMismatchException
+ {
long start = System.nanoTime();
// validate digests against each other; throw immediately on mismatch.
@@ -87,8 +94,6 @@ public class DigestResolver extends ResponseResolver
if (logger.isTraceEnabled())
logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
}
public boolean isDataPresent()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8747004..516384a 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -219,10 +219,10 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
{
// If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch.
// Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
- // get a digest mismatch)
+ // get a digest mismatch).
try
{
- resolver.resolve();
+ resolver.compareResponses();
}
catch (DigestMismatchException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java
index e7c94a1..81b18b6 100644
--- a/src/java/org/apache/cassandra/service/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/ResponseResolver.java
@@ -47,6 +47,18 @@ public abstract class ResponseResolver
public abstract PartitionIterator getData();
public abstract PartitionIterator resolve() throws DigestMismatchException;
+ /**
+ * Compares received responses, potentially triggering a digest mismatch (for a digest resolver) and read-repairs
+ * (for a data resolver).
+ * <p>
+ * This is functionally equivalent to calling {@link #resolve()} and consuming the result, but can be slightly more
+ * efficient in some case due to the fact that we don't care about the result itself. This is used when doing
+ * asynchronous read-repairs.
+ *
+ * @throws DigestMismatchException if it's a digest resolver and the responses don't match.
+ */
+ public abstract void compareResponses() throws DigestMismatchException;
+
public abstract boolean isDataPresent();
public void preprocess(MessageIn<ReadResponse> message)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index c9878d4..fd1e54e 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -129,6 +129,21 @@ public class DataResolverTest
MessagingService.instance().clearMessageSinks();
}
+ /**
+ * Checks that the provided data resolver has the expected number of repair futures created.
+ * This method also "release" those future by faking replica responses to those repair, which is necessary or
+ * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures.
+ */
+ private void assertRepairFuture(DataResolver resolver, int expectedRepairs)
+ {
+ assertEquals(expectedRepairs, resolver.repairResults.size());
+
+ // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want
+ // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload).
+ for (AsyncOneResponse<?> future : resolver.repairResults)
+ future.response(MessageIn.create(null, null, null, null, -1));
+ }
+
@Test
public void testResolveNewerSingleRow() throws UnknownHostException
{
@@ -142,12 +157,15 @@ public class DataResolverTest
.add("c1", "v2")
.buildUpdate())));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c1");
- assertColumn(cfm, row, "c1", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c1");
+ assertColumn(cfm, row, "c1", "v2", 1);
+ }
+ assertRepairFuture(resolver, 1);
}
assertEquals(1, messageRecorder.sent.size());
@@ -172,13 +190,16 @@ public class DataResolverTest
.add("c2", "v2")
.buildUpdate())));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c1", "c2");
- assertColumn(cfm, row, "c1", "v1", 0);
- assertColumn(cfm, row, "c2", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c1", "c2");
+ assertColumn(cfm, row, "c1", "v1", 0);
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -224,6 +245,7 @@ public class DataResolverTest
assertFalse(rows.hasNext());
assertFalse(data.hasNext());
}
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -289,6 +311,7 @@ public class DataResolverTest
assertFalse(rows.hasNext());
}
+ assertRepairFuture(resolver, 4);
}
assertEquals(4, messageRecorder.sent.size());
@@ -330,12 +353,15 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false)));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c2");
- assertColumn(cfm, row, "c2", "v2", 1);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c2");
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ assertRepairFuture(resolver, 1);
}
assertEquals(1, messageRecorder.sent.size());
@@ -356,6 +382,7 @@ public class DataResolverTest
try(PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 0);
}
assertTrue(messageRecorder.sent.isEmpty());
@@ -376,6 +403,7 @@ public class DataResolverTest
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 1);
}
// peer1 should get the deletion from peer2
@@ -407,12 +435,15 @@ public class DataResolverTest
InetAddress peer4 = peer();
resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
- try(PartitionIterator data = resolver.resolve();
- RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "two");
- assertColumn(cfm, row, "two", "B", 3);
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "two");
+ assertColumn(cfm, row, "two", "B", 3);
+ }
+ assertRepairFuture(resolver, 4);
}
// peer 1 needs to get the partition delete from peer 4 and the row from peer 3
@@ -498,6 +529,7 @@ public class DataResolverTest
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 2);
}
assertEquals(2, messageRecorder.sent.size());
@@ -575,12 +607,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
- Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+ Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+ }
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;
@@ -625,6 +661,7 @@ public class DataResolverTest
try(PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;
@@ -665,12 +702,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- ComplexColumnData cd = row.getComplexColumnData(m);
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+ assertRepairFuture(resolver, 1);
}
Assert.assertNull(messageRecorder.sent.get(peer1));
@@ -714,12 +755,16 @@ public class DataResolverTest
InetAddress peer2 = peer();
resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
- try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ try(PartitionIterator data = resolver.resolve())
{
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- ComplexColumnData cd = row.getComplexColumnData(m);
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+ assertRepairFuture(resolver, 1);
}
MessageOut<Mutation> msg;