You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/11/11 20:58:17 UTC

svn commit: r1034086 - in /cassandra/branches/cassandra-0.6: CHANGES.txt src/java/org/apache/cassandra/service/ConsistencyChecker.java

Author: jbellis
Date: Thu Nov 11 19:58:16 2010
New Revision: 1034086

URL: http://svn.apache.org/viewvc?rev=1034086&view=rev
Log:
more-efficient read repair
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1719

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1034086&r1=1034085&r2=1034086&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Nov 11 19:58:16 2010
@@ -2,6 +2,7 @@ dev
  * Update windows .bat files to work outside of main Cassandra
    directory (CASSANDRA-1713)
  * fix read repair regression from 0.6.7 (CASSANDRA-1727)
+ * more-efficient read repair (CASSANDRA-1719)
 
 
 0.6.7

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1034086&r1=1034085&r2=1034086&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Nov 11 19:58:16 2010
@@ -45,7 +45,20 @@ import org.apache.cassandra.net.Messagin
 import org.apache.cassandra.utils.ExpiringMap;
 import org.apache.cassandra.utils.FBUtilities;
 
-
+/**
+ * ConsistencyChecker does the following:
+ *
+ * [ConsistencyChecker.run]
+ * (1) sends DIGEST read requests to each other replica of the given row.
+ *
+ * [DigestResponseHandler]
+ * (2) If any of the digests to not match the local one, it sends a second round of requests
+ * to each replica, this time for the full data
+ *
+ * [DataRepairHandler]
+ * (3) processes full-read responses and invokes resolve.  The actual sending of messages
+ * repairing out-of-date or missing data is handled by ReadResponseResolver.
+ */
 class ConsistencyChecker implements Runnable
 {
 	private static Logger logger_ = Logger.getLogger(ConsistencyChecker.class);
@@ -62,6 +75,7 @@ class ConsistencyChecker implements Runn
         row_ = row;
         replicas_ = endpoints;
         readCommand_ = readCommand;
+        assert replicas_.contains(FBUtilities.getLocalAddress());
     }
 
     public void run()
@@ -96,8 +110,9 @@ class ConsistencyChecker implements Runn
     class DigestResponseHandler implements IAsyncCallback
 	{
         private boolean repairInvoked;
+        private final byte[] localDigest = ColumnFamily.digest(row_.cf);
 
-		public synchronized void response(Message response)
+        public synchronized void response(Message response)
 		{
             if (repairInvoked)
                 return;
@@ -109,19 +124,15 @@ class ConsistencyChecker implements Runn
                 ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
                 byte[] digest = result.digest();
 
-                if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
+                if (!Arrays.equals(localDigest, digest))
                 {
                     ReadResponseResolver readResponseResolver = new ReadResponseResolver(table_, replicas_.size());
-                    IAsyncCallback responseHandler;
-                    if (replicas_.contains(FBUtilities.getLocalAddress()))
-                        responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver);
-                    else
-                        responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver);
+                    IAsyncCallback responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver);
 
                     ReadCommand readCommand = constructReadMessage(false);
                     Message message = readCommand.makeReadMessage();
                     if (logger_.isDebugEnabled())
-                      logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+                        logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");                         
                     MessagingService.instance.addCallback(responseHandler, message.getMessageId());
                     for (InetAddress endpoint : replicas_)
                     {
@@ -145,15 +156,10 @@ class ConsistencyChecker implements Runn
 		private final ReadResponseResolver readResponseResolver_;
 		private final int majority_;
 		
-		DataRepairHandler(int responseCount, ReadResponseResolver readResponseResolver)
-		{
-			readResponseResolver_ = readResponseResolver;
-			majority_ = (responseCount / 2) + 1;  
-		}
-
         public DataRepairHandler(Row localRow, int responseCount, ReadResponseResolver readResponseResolver) throws IOException
         {
-            this(responseCount, readResponseResolver);
+            readResponseResolver_ = readResponseResolver;
+            majority_ = (responseCount / 2) + 1;
             // wrap localRow in a response Message so it doesn't need to be special-cased in the resolver
             ReadResponse readResponse = new ReadResponse(localRow);
             Message fakeMessage = new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);