You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2017/01/19 15:55:30 UTC

[2/6] cassandra git commit: Read repair is not blocking repair to finish in foreground repair

Read repair is not blocking repair to finish in foreground repair

patch by Sylvain Lebresne; reviewed by Xiaolong Jiang and Jason Brown for CASSANDRA-13115


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48fed801
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48fed801
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48fed801

Branch: refs/heads/cassandra-3.11
Commit: 48fed80162592f741bf29298e2064452d53de4d8
Parents: 3f41d7a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 12 10:03:11 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 19 16:49:14 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../UnfilteredPartitionIterators.java           |   1 +
 .../cassandra/service/AsyncRepairCallback.java  |   5 +-
 .../apache/cassandra/service/DataResolver.java  |  14 ++-
 .../cassandra/service/DigestResolver.java       |   9 +-
 .../apache/cassandra/service/ReadCallback.java  |   4 +-
 .../cassandra/service/ResponseResolver.java     |  12 ++
 .../cassandra/service/DataResolverTest.java     | 117 +++++++++++++------
 8 files changed, 119 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97d49af..6293cfa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
  * Stress daemon help is incorrect (CASSANDRA-12563)
  * Remove ALTER TYPE support (CASSANDRA-12443)
  * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 41b1424..1abbb19 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -159,6 +159,7 @@ public abstract class UnfilteredPartitionIterators
             public void close()
             {
                 merged.close();
+                listener.close();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index dec5319..d613f3d 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.concurrent.Stage;
@@ -46,9 +45,9 @@ public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
         {
             StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
             {
-                protected void runMayThrow() throws DigestMismatchException, IOException
+                protected void runMayThrow()
                 {
-                    repairResolver.resolve();
+                    repairResolver.compareResponses();
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 4e5bfb8..01953e1 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -21,6 +21,8 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
@@ -40,7 +42,8 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class DataResolver extends ResponseResolver
 {
-    private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+    @VisibleForTesting
+    final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 
     public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
     {
@@ -73,6 +76,15 @@ public class DataResolver extends ResponseResolver
         return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
     }
 
+    public void compareResponses()
+    {
+        // We need to fully consume the results to trigger read repairs if appropriate
+        try (PartitionIterator iterator = resolve())
+        {
+            PartitionIterators.consume(iterator);
+        }
+    }
+
     private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
     {
         // If we have only one results, there is no read repair to do and we can't get short reads

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 4a918a3..6a528e9 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -69,6 +69,13 @@ public class DigestResolver extends ResponseResolver
         if (logger.isTraceEnabled())
             logger.trace("resolving {} responses", responses.size());
 
+        compareResponses();
+
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+    }
+
+    public void compareResponses() throws DigestMismatchException
+    {
         long start = System.nanoTime();
 
         // validate digests against each other; throw immediately on mismatch.
@@ -87,8 +94,6 @@ public class DigestResolver extends ResponseResolver
 
         if (logger.isTraceEnabled())
             logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
     }
 
     public boolean isDataPresent()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8747004..516384a 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -219,10 +219,10 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
         {
             // If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch.
             // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
-            // get a digest mismatch)
+            // get a digest mismatch).
             try
             {
-                resolver.resolve();
+                resolver.compareResponses();
             }
             catch (DigestMismatchException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java
index e7c94a1..81b18b6 100644
--- a/src/java/org/apache/cassandra/service/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/ResponseResolver.java
@@ -47,6 +47,18 @@ public abstract class ResponseResolver
     public abstract PartitionIterator getData();
     public abstract PartitionIterator resolve() throws DigestMismatchException;
 
+    /**
+     * Compares received responses, potentially triggering a digest mismatch (for a digest resolver) and read-repairs
+     * (for a data resolver).
+     * <p>
+     * This is functionally equivalent to calling {@link #resolve()} and consuming the result, but can be slightly more
+     * efficient in some case due to the fact that we don't care about the result itself. This is used when doing
+     * asynchronous read-repairs.
+     *
+     * @throws DigestMismatchException if it's a digest resolver and the responses don't match.
+     */
+    public abstract void compareResponses() throws DigestMismatchException;
+
     public abstract boolean isDataPresent();
 
     public void preprocess(MessageIn<ReadResponse> message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index c9878d4..fd1e54e 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -129,6 +129,21 @@ public class DataResolverTest
         MessagingService.instance().clearMessageSinks();
     }
 
+    /**
+     * Checks that the provided data resolver has the expected number of repair futures created.
+     * This method also "release" those future by faking replica responses to those repair, which is necessary or
+     * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures.
+     */
+    private void assertRepairFuture(DataResolver resolver, int expectedRepairs)
+    {
+        assertEquals(expectedRepairs, resolver.repairResults.size());
+
+        // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want
+        // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload).
+        for (AsyncOneResponse<?> future : resolver.repairResults)
+            future.response(MessageIn.create(null, null, null, null, -1));
+    }
+
     @Test
     public void testResolveNewerSingleRow() throws UnknownHostException
     {
@@ -142,12 +157,15 @@ public class DataResolverTest
                                                                                                        .add("c1", "v2")
                                                                                                        .buildUpdate())));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c1");
-            assertColumn(cfm, row, "c1", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c1");
+                assertColumn(cfm, row, "c1", "v2", 1);
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         assertEquals(1, messageRecorder.sent.size());
@@ -172,13 +190,16 @@ public class DataResolverTest
                                                                                                        .add("c2", "v2")
                                                                                                        .buildUpdate())));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c1", "c2");
-            assertColumn(cfm, row, "c1", "v1", 0);
-            assertColumn(cfm, row, "c2", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c1", "c2");
+                assertColumn(cfm, row, "c1", "v1", 0);
+                assertColumn(cfm, row, "c2", "v2", 1);
+            }
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -224,6 +245,7 @@ public class DataResolverTest
                 assertFalse(rows.hasNext());
                 assertFalse(data.hasNext());
             }
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -289,6 +311,7 @@ public class DataResolverTest
 
                 assertFalse(rows.hasNext());
             }
+            assertRepairFuture(resolver, 4);
         }
 
         assertEquals(4, messageRecorder.sent.size());
@@ -330,12 +353,15 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false)));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "c2");
-            assertColumn(cfm, row, "c2", "v2", 1);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "c2");
+                assertColumn(cfm, row, "c2", "v2", 1);
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         assertEquals(1, messageRecorder.sent.size());
@@ -356,6 +382,7 @@ public class DataResolverTest
         try(PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 0);
         }
 
         assertTrue(messageRecorder.sent.isEmpty());
@@ -376,6 +403,7 @@ public class DataResolverTest
         try (PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 1);
         }
 
         // peer1 should get the deletion from peer2
@@ -407,12 +435,15 @@ public class DataResolverTest
         InetAddress peer4 = peer();
         resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
 
-        try(PartitionIterator data = resolver.resolve();
-            RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "two");
-            assertColumn(cfm, row, "two", "B", 3);
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "two");
+                assertColumn(cfm, row, "two", "B", 3);
+            }
+            assertRepairFuture(resolver, 4);
         }
 
         // peer 1 needs to get the partition delete from peer 4 and the row from peer 3
@@ -498,6 +529,7 @@ public class DataResolverTest
         try (PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 2);
         }
 
         assertEquals(2, messageRecorder.sent.size());
@@ -575,12 +607,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
-            Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+                Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;
@@ -625,6 +661,7 @@ public class DataResolverTest
         try(PartitionIterator data = resolver.resolve())
         {
             assertFalse(data.hasNext());
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;
@@ -665,12 +702,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            ComplexColumnData cd = row.getComplexColumnData(m);
-            assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                ComplexColumnData cd = row.getComplexColumnData(m);
+                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         Assert.assertNull(messageRecorder.sent.get(peer1));
@@ -714,12 +755,16 @@ public class DataResolverTest
         InetAddress peer2 = peer();
         resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
 
-        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        try(PartitionIterator data = resolver.resolve())
         {
-            Row row = Iterators.getOnlyElement(rows);
-            assertColumns(row, "m");
-            ComplexColumnData cd = row.getComplexColumnData(m);
-            assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            try (RowIterator rows = Iterators.getOnlyElement(data))
+            {
+                Row row = Iterators.getOnlyElement(rows);
+                assertColumns(row, "m");
+                ComplexColumnData cd = row.getComplexColumnData(m);
+                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+            }
+            assertRepairFuture(resolver, 1);
         }
 
         MessageOut<Mutation> msg;