You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/11/11 20:58:17 UTC
svn commit: r1034086 - in /cassandra/branches/cassandra-0.6: CHANGES.txt
src/java/org/apache/cassandra/service/ConsistencyChecker.java
Author: jbellis
Date: Thu Nov 11 19:58:16 2010
New Revision: 1034086
URL: http://svn.apache.org/viewvc?rev=1034086&view=rev
Log:
more-efficient read repair
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1719
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1034086&r1=1034085&r2=1034086&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Nov 11 19:58:16 2010
@@ -2,6 +2,7 @@ dev
* Update windows .bat files to work outside of main Cassandra
directory (CASSANDRA-1713)
* fix read repair regression from 0.6.7 (CASSANDRA-1727)
+ * more-efficient read repair (CASSANDRA-1719)
0.6.7
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1034086&r1=1034085&r2=1034086&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Nov 11 19:58:16 2010
@@ -45,7 +45,20 @@ import org.apache.cassandra.net.Messagin
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
-
+/**
+ * ConsistencyChecker does the following:
+ *
+ * [ConsistencyChecker.run]
+ * (1) sends DIGEST read requests to each other replica of the given row.
+ *
+ * [DigestResponseHandler]
+ * (2) If any of the digests to not match the local one, it sends a second round of requests
+ * to each replica, this time for the full data
+ *
+ * [DataRepairHandler]
+ * (3) processes full-read responses and invokes resolve. The actual sending of messages
+ * repairing out-of-date or missing data is handled by ReadResponseResolver.
+ */
class ConsistencyChecker implements Runnable
{
private static Logger logger_ = Logger.getLogger(ConsistencyChecker.class);
@@ -62,6 +75,7 @@ class ConsistencyChecker implements Runn
row_ = row;
replicas_ = endpoints;
readCommand_ = readCommand;
+ assert replicas_.contains(FBUtilities.getLocalAddress());
}
public void run()
@@ -96,8 +110,9 @@ class ConsistencyChecker implements Runn
class DigestResponseHandler implements IAsyncCallback
{
private boolean repairInvoked;
+ private final byte[] localDigest = ColumnFamily.digest(row_.cf);
- public synchronized void response(Message response)
+ public synchronized void response(Message response)
{
if (repairInvoked)
return;
@@ -109,19 +124,15 @@ class ConsistencyChecker implements Runn
ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
byte[] digest = result.digest();
- if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
+ if (!Arrays.equals(localDigest, digest))
{
ReadResponseResolver readResponseResolver = new ReadResponseResolver(table_, replicas_.size());
- IAsyncCallback responseHandler;
- if (replicas_.contains(FBUtilities.getLocalAddress()))
- responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver);
- else
- responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver);
+ IAsyncCallback responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver);
ReadCommand readCommand = constructReadMessage(false);
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
- logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+ logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
MessagingService.instance.addCallback(responseHandler, message.getMessageId());
for (InetAddress endpoint : replicas_)
{
@@ -145,15 +156,10 @@ class ConsistencyChecker implements Runn
private final ReadResponseResolver readResponseResolver_;
private final int majority_;
- DataRepairHandler(int responseCount, ReadResponseResolver readResponseResolver)
- {
- readResponseResolver_ = readResponseResolver;
- majority_ = (responseCount / 2) + 1;
- }
-
public DataRepairHandler(Row localRow, int responseCount, ReadResponseResolver readResponseResolver) throws IOException
{
- this(responseCount, readResponseResolver);
+ readResponseResolver_ = readResponseResolver;
+ majority_ = (responseCount / 2) + 1;
// wrap localRow in a response Message so it doesn't need to be special-cased in the resolver
ReadResponse readResponse = new ReadResponse(localRow);
Message fakeMessage = new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);