You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2017/01/19 15:55:29 UTC

[1/6] cassandra git commit: Read repair is not blocking repair to finish in foreground repair

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 3f41d7a76 -> 48fed8016
  refs/heads/cassandra-3.11 affa68fd1 -> 74559de50
  refs/heads/trunk 52df6a58d -> c3d724445


Read repair is not blocking repair to finish in foreground repair

patch by Sylvain Lebresne; reviewed by Xiaolong Jiang and Jason Brown for CASSANDRA-13115


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48fed801
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48fed801
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48fed801

Branch: refs/heads/cassandra-3.0
Commit: 48fed80162592f741bf29298e2064452d53de4d8
Parents: 3f41d7a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 12 10:03:11 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:49:14 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../UnfilteredPartitionIterators.java           |   1 +
 .../cassandra/service/AsyncRepairCallback.java  |   5 +-
 .../apache/cassandra/service/DataResolver.java  |  14 ++-
 .../cassandra/service/DigestResolver.java       |   9 +-
 .../apache/cassandra/service/ReadCallback.java  |   4 +-
 .../cassandra/service/ResponseResolver.java     |  12 ++
 .../cassandra/service/DataResolverTest.java     | 117 +++++++++++++------
 8 files changed, 119 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97d49af..6293cfa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
  * Stress daemon help is incorrect (CASSANDRA-12563)
  * Remove ALTER TYPE support (CASSANDRA-12443)
  * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 41b1424..1abbb19 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -159,6 +159,7 @@ public abstract class UnfilteredPartitionIterators
             public void close()
             {
                 merged.close();
+                listener.close();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index dec5319..d613f3d 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.concurrent.Stage;
@@ -46,9 +45,9 @@ public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
         {
             StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
             {
-                protected void runMayThrow() throws DigestMismatchException, IOException
+                protected void runMayThrow()
                 {
-                    repairResolver.resolve();
+                    repairResolver.compareResponses();
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 4e5bfb8..01953e1 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -21,6 +21,8 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
@@ -40,7 +42,8 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class DataResolver extends ResponseResolver
 {
-    private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+    @VisibleForTesting
+    final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 
     public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
     {
@@ -73,6 +76,15 @@ public class DataResolver extends ResponseResolver
         return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
     }
 
+    public void compareResponses()
+    {
+        // We need to fully consume the results to trigger read repairs if appropriate
+        try (PartitionIterator iterator = resolve())
+        {
+            PartitionIterators.consume(iterator);
+        }
+    }
+
     private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
     {
         // If we have only one results, there is no read repair to do and we can't get short reads

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 4a918a3..6a528e9 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -69,6 +69,13 @@ public class DigestResolver extends ResponseResolver
         if (logger.isTraceEnabled())
             logger.trace("resolving {} responses", responses.size());
 
+        compareResponses();
+
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+    }
+
+    public void compareResponses() throws DigestMismatchException
+    {
         long start = System.nanoTime();
 
         // validate digests against each other; throw immediately on mismatch.
@@ -87,8 +94,6 @@ public class DigestResolver extends ResponseResolver
 
         if (logger.isTraceEnabled())
             logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
     }
 
     public boolean isDataPresent()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8747004..516384a 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -219,10 +219,10 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
         {
             // If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch.
             // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
-            // get a digest mismatch)
+            // get a digest mismatch).
             try
             {
-                resolver.resolve();
+                resolver.compareResponses();
             }
             catch (DigestMismatchException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java
index e7c94a1..81b18b6 100644
--- a/src/java/org/apache/cassandra/service/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/ResponseResolver.java
@@ -47,6 +47,18 @@ public abstract class ResponseResolver
     public abstract PartitionIterator getData();
     public abstract PartitionIterator resolve() throws DigestMismatchException;
 
+    /**
+     * Compares received responses, potentially triggering a digest mismatch (for a digest resolver) and read-repairs
+     * (for a data resolver).
+     * <p>
+     * This is functionally equivalent to calling {@link #resolve()} and consuming the result, but can be slightly more
+     * efficient in some case due to the fact that we don't care about the result itself. This is used when doing
+     * asynchronous read-repairs.
+     *
+     * @throws DigestMismatchException if it's a digest resolver and the responses don't match.
+     */
+    public abstract void compareResponses() throws DigestMismatchException;
+
     public abstract boolean isDataPresent();
 
     public void preprocess(MessageIn<ReadResponse> message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index c9878d4..fd1e54e 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -129,6 +129,21 @@ public class DataResolverTest
         MessagingService.instance().clearMessageSinks();
     }
 
+    /**
+     * Checks that the provided data resolver has the expected number of repair futures created.
+     * This method also "release" those future by faking replica responses to those repair, which is necessary or
+     * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures.
+     */
+    private void assertRepairFuture(DataResolver resolver, int expectedRepairs)
+    {
+        assertEquals(expectedRepairs, resolver.repairResults.size());
+
+        // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want
+        // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload).
+        for (AsyncOneResponse<?> future : resolver.repairResults)
+            future.response(MessageIn.create(null, null, null, null, -1));
+    }
+
     @Test
     public void testResolveNewerSingleRow() throws UnknownHostException
     {
@@ -142,12 +157,15 @@ public class DataResolverTest
                                                                                                        .add("c1", "v2")
                                                                                                        .buildUpdate())));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c1");
-            assertColumn(cfm, row, "c1", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c1");
+                assertColumn(cfm, row, "c1", "v2", 1);
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         assertEquals(1, messageRecorder.sent.size());
@@ -172,13 +190,16 @@ public class DataResolverTest
                                                                                                        .add("c2", "v2")
                                                                                                        .buildUpdate())));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c1", "c2");
-            assertColumn(cfm, row, "c1", "v1", 0);
-            assertColumn(cfm, row, "c2", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c1", "c2");
+                assertColumn(cfm, row, "c1", "v1", 0);
+                assertColumn(cfm, row, "c2", "v2", 1);
+            }
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -224,6 +245,7 @@ public class DataResolverTest
                 assertFalse(rows.hasNext());
                 assertFalse(data.hasNext());
             }
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -289,6 +311,7 @@ public class DataResolverTest
 
                 assertFalse(rows.hasNext());
             }
+            assertRepairFuture(resolver, 4);
         }
 
         assertEquals(4, messageRecorder.sent.size());
@@ -330,12 +353,15 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false)));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c2");
-            assertColumn(cfm, row, "c2", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c2");
+                assertColumn(cfm, row, "c2", "v2", 1);
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         assertEquals(1, messageRecorder.sent.size());
@@ -356,6 +382,7 @@ public class DataResolverTest
         try(PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 0);
         }
 
         assertTrue(messageRecorder.sent.isEmpty());
@@ -376,6 +403,7 @@ public class DataResolverTest
         try (PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 1);
         }
 
         // peer1 should get the deletion from peer2
@@ -407,12 +435,15 @@ public class DataResolverTest
         InetAddress peer4 = peer();
         resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "two");
-            assertColumn(cfm, row, "two", "B", 3);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "two");
+                assertColumn(cfm, row, "two", "B", 3);
+            }
+            assertRepairFuture(resolver, 4);
         }
 
         // peer 1 needs to get the partition delete from peer 4 and the row from peer 3
@@ -498,6 +529,7 @@ public class DataResolverTest
         try (PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -575,12 +607,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
-            Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+                Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;
@@ -625,6 +661,7 @@ public class DataResolverTest
         try(PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;
@@ -665,12 +702,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            ComplexColumnData cd = row.getComplexColumnData(m);
-            assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                ComplexColumnData cd = row.getComplexColumnData(m);
+                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         Assert.assertNull(messageRecorder.sent.get(peer1));
@@ -714,12 +755,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            ComplexColumnData cd = row.getComplexColumnData(m);
-            assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                ComplexColumnData cd = row.getComplexColumnData(m);
+                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;


[2/6] cassandra git commit: Read repair is not blocking repair to finish in foreground repair

Posted by sl...@apache.org.
Read repair is not blocking repair to finish in foreground repair

patch by Sylvain Lebresne; reviewed by Xiaolong Jiang and Jason Brown for CASSANDRA-13115


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48fed801
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48fed801
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48fed801

Branch: refs/heads/cassandra-3.11
Commit: 48fed80162592f741bf29298e2064452d53de4d8
Parents: 3f41d7a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 12 10:03:11 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:49:14 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../UnfilteredPartitionIterators.java           |   1 +
 .../cassandra/service/AsyncRepairCallback.java  |   5 +-
 .../apache/cassandra/service/DataResolver.java  |  14 ++-
 .../cassandra/service/DigestResolver.java       |   9 +-
 .../apache/cassandra/service/ReadCallback.java  |   4 +-
 .../cassandra/service/ResponseResolver.java     |  12 ++
 .../cassandra/service/DataResolverTest.java     | 117 +++++++++++++------
 8 files changed, 119 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97d49af..6293cfa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
  * Stress daemon help is incorrect (CASSANDRA-12563)
  * Remove ALTER TYPE support (CASSANDRA-12443)
  * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 41b1424..1abbb19 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -159,6 +159,7 @@ public abstract class UnfilteredPartitionIterators
             public void close()
             {
                 merged.close();
+                listener.close();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index dec5319..d613f3d 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.concurrent.Stage;
@@ -46,9 +45,9 @@ public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
         {
             StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
             {
-                protected void runMayThrow() throws DigestMismatchException, IOException
+                protected void runMayThrow()
                 {
-                    repairResolver.resolve();
+                    repairResolver.compareResponses();
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 4e5bfb8..01953e1 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -21,6 +21,8 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
@@ -40,7 +42,8 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class DataResolver extends ResponseResolver
 {
-    private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+    @VisibleForTesting
+    final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 
     public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
     {
@@ -73,6 +76,15 @@ public class DataResolver extends ResponseResolver
         return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
     }
 
+    public void compareResponses()
+    {
+        // We need to fully consume the results to trigger read repairs if appropriate
+        try (PartitionIterator iterator = resolve())
+        {
+            PartitionIterators.consume(iterator);
+        }
+    }
+
     private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
     {
         // If we have only one results, there is no read repair to do and we can't get short reads

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 4a918a3..6a528e9 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -69,6 +69,13 @@ public class DigestResolver extends ResponseResolver
         if (logger.isTraceEnabled())
             logger.trace("resolving {} responses", responses.size());
 
+        compareResponses();
+
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+    }
+
+    public void compareResponses() throws DigestMismatchException
+    {
         long start = System.nanoTime();
 
         // validate digests against each other; throw immediately on mismatch.
@@ -87,8 +94,6 @@ public class DigestResolver extends ResponseResolver
 
         if (logger.isTraceEnabled())
             logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
     }
 
     public boolean isDataPresent()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8747004..516384a 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -219,10 +219,10 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
         {
             // If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch.
             // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
-            // get a digest mismatch)
+            // get a digest mismatch).
             try
             {
-                resolver.resolve();
+                resolver.compareResponses();
             }
             catch (DigestMismatchException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java
index e7c94a1..81b18b6 100644
--- a/src/java/org/apache/cassandra/service/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/ResponseResolver.java
@@ -47,6 +47,18 @@ public abstract class ResponseResolver
     public abstract PartitionIterator getData();
     public abstract PartitionIterator resolve() throws DigestMismatchException;
 
+    /**
+     * Compares received responses, potentially triggering a digest mismatch (for a digest resolver) and read-repairs
+     * (for a data resolver).
+     * <p>
+     * This is functionally equivalent to calling {@link #resolve()} and consuming the result, but can be slightly more
+     * efficient in some case due to the fact that we don't care about the result itself. This is used when doing
+     * asynchronous read-repairs.
+     *
+     * @throws DigestMismatchException if it's a digest resolver and the responses don't match.
+     */
+    public abstract void compareResponses() throws DigestMismatchException;
+
     public abstract boolean isDataPresent();
 
     public void preprocess(MessageIn<ReadResponse> message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index c9878d4..fd1e54e 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -129,6 +129,21 @@ public class DataResolverTest
         MessagingService.instance().clearMessageSinks();
     }
 
+    /**
+     * Checks that the provided data resolver has the expected number of repair futures created.
+     * This method also "release" those future by faking replica responses to those repair, which is necessary or
+     * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures.
+     */
+    private void assertRepairFuture(DataResolver resolver, int expectedRepairs)
+    {
+        assertEquals(expectedRepairs, resolver.repairResults.size());
+
+        // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want
+        // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload).
+        for (AsyncOneResponse<?> future : resolver.repairResults)
+            future.response(MessageIn.create(null, null, null, null, -1));
+    }
+
     @Test
     public void testResolveNewerSingleRow() throws UnknownHostException
     {
@@ -142,12 +157,15 @@ public class DataResolverTest
                                                                                                        .add("c1", "v2")
                                                                                                        .buildUpdate())));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c1");
-            assertColumn(cfm, row, "c1", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c1");
+                assertColumn(cfm, row, "c1", "v2", 1);
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         assertEquals(1, messageRecorder.sent.size());
@@ -172,13 +190,16 @@ public class DataResolverTest
                                                                                                        .add("c2", "v2")
                                                                                                        .buildUpdate())));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c1", "c2");
-            assertColumn(cfm, row, "c1", "v1", 0);
-            assertColumn(cfm, row, "c2", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c1", "c2");
+                assertColumn(cfm, row, "c1", "v1", 0);
+                assertColumn(cfm, row, "c2", "v2", 1);
+            }
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -224,6 +245,7 @@ public class DataResolverTest
                 assertFalse(rows.hasNext());
                 assertFalse(data.hasNext());
             }
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -289,6 +311,7 @@ public class DataResolverTest
 
                 assertFalse(rows.hasNext());
             }
+            assertRepairFuture(resolver, 4);
         }
 
         assertEquals(4, messageRecorder.sent.size());
@@ -330,12 +353,15 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false)));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c2");
-            assertColumn(cfm, row, "c2", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c2");
+                assertColumn(cfm, row, "c2", "v2", 1);
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         assertEquals(1, messageRecorder.sent.size());
@@ -356,6 +382,7 @@ public class DataResolverTest
         try(PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 0);
         }
 
         assertTrue(messageRecorder.sent.isEmpty());
@@ -376,6 +403,7 @@ public class DataResolverTest
         try (PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 1);
         }
 
         // peer1 should get the deletion from peer2
@@ -407,12 +435,15 @@ public class DataResolverTest
         InetAddress peer4 = peer();
         resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "two");
-            assertColumn(cfm, row, "two", "B", 3);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "two");
+                assertColumn(cfm, row, "two", "B", 3);
+            }
+            assertRepairFuture(resolver, 4);
         }
 
         // peer 1 needs to get the partition delete from peer 4 and the row from peer 3
@@ -498,6 +529,7 @@ public class DataResolverTest
         try (PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -575,12 +607,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
-            Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+                Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;
@@ -625,6 +661,7 @@ public class DataResolverTest
         try(PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;
@@ -665,12 +702,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            ComplexColumnData cd = row.getComplexColumnData(m);
-            assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                ComplexColumnData cd = row.getComplexColumnData(m);
+                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         Assert.assertNull(messageRecorder.sent.get(peer1));
@@ -714,12 +755,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            ComplexColumnData cd = row.getComplexColumnData(m);
-            assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                ComplexColumnData cd = row.getComplexColumnData(m);
+                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;


[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by sl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11

* cassandra-3.0:
  Read repair is not blocking repair to finish in foreground repair


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/74559de5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/74559de5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/74559de5

Branch: refs/heads/trunk
Commit: 74559de50fa6974eba56a00ca37cfc7746134211
Parents: affa68f 48fed80
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 19 16:54:22 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:54:22 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../UnfilteredPartitionIterators.java           |   1 +
 .../cassandra/service/AsyncRepairCallback.java  |   2 +-
 .../apache/cassandra/service/DataResolver.java  |  14 ++-
 .../cassandra/service/DigestResolver.java       |   9 +-
 .../apache/cassandra/service/ReadCallback.java  |   4 +-
 .../cassandra/service/ResponseResolver.java     |  12 ++
 .../cassandra/service/DataResolverTest.java     | 117 +++++++++++++------
 8 files changed, 118 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 853ca09,6293cfa..8b613e6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,120 -1,11 +1,121 @@@
 -3.0.11
 +3.10
 + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
 + * Fixed query monitoring for range queries (CASSANDRA-13050)
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
+  * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
 - * Stress daemon help is incorrect (CASSANDRA-12563)
 + * Stress daemon help is incorrect(CASSANDRA-12563)
   * Remove ALTER TYPE support (CASSANDRA-12443)
   * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
 - * Set javac encoding to utf-8 (CASSANDRA-11077)
   * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
 - * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
   * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
   * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
   * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index be8eca1,01953e1..1bbfecd
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -42,13 -42,12 +44,14 @@@ import org.apache.cassandra.utils.FBUti
  
  public class DataResolver extends ResponseResolver
  {
-     private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+     @VisibleForTesting
+     final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 +    private final long queryStartNanoTime;
  
 -    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
 +    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
      {
          super(keyspace, command, consistency, maxResponseCount);
 +        this.queryStartNanoTime = queryStartNanoTime;
      }
  
      public PartitionIterator getData()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------


[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by sl...@apache.org.
Merge branch 'cassandra-3.11' into trunk

* cassandra-3.11:
  Read repair is not blocking repair to finish in foreground repair


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3d72444
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3d72444
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3d72444

Branch: refs/heads/trunk
Commit: c3d72444512861c75ec11e17f52770d2613e5364
Parents: 52df6a5 74559de
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 19 16:55:11 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:55:11 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../UnfilteredPartitionIterators.java           |   1 +
 .../cassandra/service/AsyncRepairCallback.java  |   2 +-
 .../apache/cassandra/service/DataResolver.java  |  14 ++-
 .../cassandra/service/DigestResolver.java       |   9 +-
 .../apache/cassandra/service/ReadCallback.java  |   4 +-
 .../cassandra/service/ResponseResolver.java     |  12 ++
 .../cassandra/service/DataResolverTest.java     | 117 +++++++++++++------
 8 files changed, 118 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3d72444/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8f68618,8b613e6..cc16a8f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -139,11 -109,13 +139,12 @@@
   * Remove pre-startup check for open JMX port (CASSANDRA-12074)
   * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
   * Restore resumable hints delivery (CASSANDRA-11960)
 - * Properly report LWT contention (CASSANDRA-12626)
 + * Properly record CAS contention (CASSANDRA-12626)
  Merged from 3.0:
 + * Stress daemon help is incorrect (CASSANDRA-12563)
+  * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
 - * Stress daemon help is incorrect(CASSANDRA-12563)
 - * Remove ALTER TYPE support (CASSANDRA-12443)
 - * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
   * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
 + * Remove support for non-JavaScript UDFs (CASSANDRA-12883)
   * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
   * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
   * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3d72444/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3d72444/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3d72444/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3d72444/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/DataResolverTest.java
index 3dbc166,7c38224..dba3e95
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@@ -330,14 -353,17 +353,17 @@@ public class DataResolverTes
                                                                                                         .add("c2", "v2")
                                                                                                         .buildUpdate())));
          InetAddress peer2 = peer();
 -        resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false)));
 +        resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm)));
  
-         try(PartitionIterator data = resolver.resolve();
-             RowIterator rows = Iterators.getOnlyElement(data))
+         try(PartitionIterator data = resolver.resolve())
          {
-             Row row = Iterators.getOnlyElement(rows);
-             assertColumns(row, "c2");
-             assertColumn(cfm, row, "c2", "v2", 1);
+             try (RowIterator rows = Iterators.getOnlyElement(data))
+             {
+                 Row row = Iterators.getOnlyElement(rows);
+                 assertColumns(row, "c2");
+                 assertColumn(cfm, row, "c2", "v2", 1);
+             }
+             assertRepairFuture(resolver, 1);
          }
  
          assertEquals(1, messageRecorder.sent.size());


[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by sl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11

* cassandra-3.0:
  Read repair is not blocking repair to finish in foreground repair


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/74559de5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/74559de5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/74559de5

Branch: refs/heads/cassandra-3.11
Commit: 74559de50fa6974eba56a00ca37cfc7746134211
Parents: affa68f 48fed80
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 19 16:54:22 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:54:22 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../UnfilteredPartitionIterators.java           |   1 +
 .../cassandra/service/AsyncRepairCallback.java  |   2 +-
 .../apache/cassandra/service/DataResolver.java  |  14 ++-
 .../cassandra/service/DigestResolver.java       |   9 +-
 .../apache/cassandra/service/ReadCallback.java  |   4 +-
 .../cassandra/service/ResponseResolver.java     |  12 ++
 .../cassandra/service/DataResolverTest.java     | 117 +++++++++++++------
 8 files changed, 118 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 853ca09,6293cfa..8b613e6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,120 -1,11 +1,121 @@@
 -3.0.11
 +3.10
 + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
 + * Fixed query monitoring for range queries (CASSANDRA-13050)
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
+  * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
 - * Stress daemon help is incorrect (CASSANDRA-12563)
 + * Stress daemon help is incorrect(CASSANDRA-12563)
   * Remove ALTER TYPE support (CASSANDRA-12443)
   * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
 - * Set javac encoding to utf-8 (CASSANDRA-11077)
   * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
 - * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
   * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
   * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
   * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index be8eca1,01953e1..1bbfecd
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -42,13 -42,12 +44,14 @@@ import org.apache.cassandra.utils.FBUti
  
  public class DataResolver extends ResponseResolver
  {
-     private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+     @VisibleForTesting
+     final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 +    private final long queryStartNanoTime;
  
 -    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
 +    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
      {
          super(keyspace, command, consistency, maxResponseCount);
 +        this.queryStartNanoTime = queryStartNanoTime;
      }
  
      public PartitionIterator getData()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74559de5/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------


[3/6] cassandra git commit: Read repair is not blocking repair to finish in foreground repair

Posted by sl...@apache.org.
Read repair is not blocking repair to finish in foreground repair

patch by Sylvain Lebresne; reviewed by Xiaolong Jiang and Jason Brown for CASSANDRA-13115


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48fed801
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48fed801
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48fed801

Branch: refs/heads/trunk
Commit: 48fed80162592f741bf29298e2064452d53de4d8
Parents: 3f41d7a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 12 10:03:11 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:49:14 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../UnfilteredPartitionIterators.java           |   1 +
 .../cassandra/service/AsyncRepairCallback.java  |   5 +-
 .../apache/cassandra/service/DataResolver.java  |  14 ++-
 .../cassandra/service/DigestResolver.java       |   9 +-
 .../apache/cassandra/service/ReadCallback.java  |   4 +-
 .../cassandra/service/ResponseResolver.java     |  12 ++
 .../cassandra/service/DataResolverTest.java     | 117 +++++++++++++------
 8 files changed, 119 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97d49af..6293cfa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
  * Stress daemon help is incorrect (CASSANDRA-12563)
  * Remove ALTER TYPE support (CASSANDRA-12443)
  * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 41b1424..1abbb19 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -159,6 +159,7 @@ public abstract class UnfilteredPartitionIterators
             public void close()
             {
                 merged.close();
+                listener.close();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index dec5319..d613f3d 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.concurrent.Stage;
@@ -46,9 +45,9 @@ public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
         {
             StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
             {
-                protected void runMayThrow() throws DigestMismatchException, IOException
+                protected void runMayThrow()
                 {
-                    repairResolver.resolve();
+                    repairResolver.compareResponses();
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 4e5bfb8..01953e1 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -21,6 +21,8 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
@@ -40,7 +42,8 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class DataResolver extends ResponseResolver
 {
-    private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+    @VisibleForTesting
+    final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 
     public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
     {
@@ -73,6 +76,15 @@ public class DataResolver extends ResponseResolver
         return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
     }
 
+    public void compareResponses()
+    {
+        // We need to fully consume the results to trigger read repairs if appropriate
+        try (PartitionIterator iterator = resolve())
+        {
+            PartitionIterators.consume(iterator);
+        }
+    }
+
     private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
     {
         // If we have only one results, there is no read repair to do and we can't get short reads

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 4a918a3..6a528e9 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -69,6 +69,13 @@ public class DigestResolver extends ResponseResolver
         if (logger.isTraceEnabled())
             logger.trace("resolving {} responses", responses.size());
 
+        compareResponses();
+
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+    }
+
+    public void compareResponses() throws DigestMismatchException
+    {
         long start = System.nanoTime();
 
         // validate digests against each other; throw immediately on mismatch.
@@ -87,8 +94,6 @@ public class DigestResolver extends ResponseResolver
 
         if (logger.isTraceEnabled())
             logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
     }
 
     public boolean isDataPresent()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8747004..516384a 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -219,10 +219,10 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
         {
             // If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch.
             // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
-            // get a digest mismatch)
+            // get a digest mismatch).
             try
             {
-                resolver.resolve();
+                resolver.compareResponses();
             }
             catch (DigestMismatchException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java
index e7c94a1..81b18b6 100644
--- a/src/java/org/apache/cassandra/service/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/ResponseResolver.java
@@ -47,6 +47,18 @@ public abstract class ResponseResolver
     public abstract PartitionIterator getData();
     public abstract PartitionIterator resolve() throws DigestMismatchException;
 
+    /**
+     * Compares received responses, potentially triggering a digest mismatch (for a digest resolver) and read-repairs
+     * (for a data resolver).
+     * <p>
+     * This is functionally equivalent to calling {@link #resolve()} and consuming the result, but can be slightly more
+     * efficient in some case due to the fact that we don't care about the result itself. This is used when doing
+     * asynchronous read-repairs.
+     *
+     * @throws DigestMismatchException if it's a digest resolver and the responses don't match.
+     */
+    public abstract void compareResponses() throws DigestMismatchException;
+
     public abstract boolean isDataPresent();
 
     public void preprocess(MessageIn<ReadResponse> message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index c9878d4..fd1e54e 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -129,6 +129,21 @@ public class DataResolverTest
         MessagingService.instance().clearMessageSinks();
     }
 
+    /**
+     * Checks that the provided data resolver has the expected number of repair futures created.
+     * This method also "release" those future by faking replica responses to those repair, which is necessary or
+     * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures.
+     */
+    private void assertRepairFuture(DataResolver resolver, int expectedRepairs)
+    {
+        assertEquals(expectedRepairs, resolver.repairResults.size());
+
+        // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want
+        // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload).
+        for (AsyncOneResponse<?> future : resolver.repairResults)
+            future.response(MessageIn.create(null, null, null, null, -1));
+    }
+
     @Test
     public void testResolveNewerSingleRow() throws UnknownHostException
     {
@@ -142,12 +157,15 @@ public class DataResolverTest
                                                                                                        .add("c1", "v2")
                                                                                                        .buildUpdate())));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c1");
-            assertColumn(cfm, row, "c1", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c1");
+                assertColumn(cfm, row, "c1", "v2", 1);
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         assertEquals(1, messageRecorder.sent.size());
@@ -172,13 +190,16 @@ public class DataResolverTest
                                                                                                        .add("c2", "v2")
                                                                                                        .buildUpdate())));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c1", "c2");
-            assertColumn(cfm, row, "c1", "v1", 0);
-            assertColumn(cfm, row, "c2", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c1", "c2");
+                assertColumn(cfm, row, "c1", "v1", 0);
+                assertColumn(cfm, row, "c2", "v2", 1);
+            }
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -224,6 +245,7 @@ public class DataResolverTest
                 assertFalse(rows.hasNext());
                 assertFalse(data.hasNext());
             }
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -289,6 +311,7 @@ public class DataResolverTest
 
                 assertFalse(rows.hasNext());
             }
+            assertRepairFuture(resolver, 4);
         }
 
         assertEquals(4, messageRecorder.sent.size());
@@ -330,12 +353,15 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false)));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c2");
-            assertColumn(cfm, row, "c2", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c2");
+                assertColumn(cfm, row, "c2", "v2", 1);
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         assertEquals(1, messageRecorder.sent.size());
@@ -356,6 +382,7 @@ public class DataResolverTest
         try(PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 0);
         }
 
         assertTrue(messageRecorder.sent.isEmpty());
@@ -376,6 +403,7 @@ public class DataResolverTest
         try (PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 1);
         }
 
         // peer1 should get the deletion from peer2
@@ -407,12 +435,15 @@ public class DataResolverTest
         InetAddress peer4 = peer();
         resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "two");
-            assertColumn(cfm, row, "two", "B", 3);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "two");
+                assertColumn(cfm, row, "two", "B", 3);
+            }
+            assertRepairFuture(resolver, 4);
         }
 
         // peer 1 needs to get the partition delete from peer 4 and the row from peer 3
@@ -498,6 +529,7 @@ public class DataResolverTest
         try (PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -575,12 +607,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
-            Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+                Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;
@@ -625,6 +661,7 @@ public class DataResolverTest
         try(PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;
@@ -665,12 +702,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            ComplexColumnData cd = row.getComplexColumnData(m);
-            assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                ComplexColumnData cd = row.getComplexColumnData(m);
+                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         Assert.assertNull(messageRecorder.sent.get(peer1));
@@ -714,12 +755,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            ComplexColumnData cd = row.getComplexColumnData(m);
-            assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                ComplexColumnData cd = row.getComplexColumnData(m);
+                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;