You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/11/11 01:32:54 UTC

svn commit: r1033785 - in /cassandra/branches/cassandra-0.6: CHANGES.txt src/java/org/apache/cassandra/service/ConsistencyChecker.java src/java/org/apache/cassandra/service/ReadResponseResolver.java

Author: jbellis
Date: Thu Nov 11 00:32:53 2010
New Revision: 1033785

URL: http://svn.apache.org/viewvc?rev=1033785&view=rev
Log:
fix read repair regression from 0.6.7
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1727

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Nov 11 00:32:53 2010
@@ -1,6 +1,7 @@
-0.6.7
+dev
  * Update windows .bat files to work outside of main Cassandra
    directory (CASSANDRA-1713)
+ * fix read repair regression from 0.6.7 (CASSANDRA-1727)
 
 
 0.6.7

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Nov 11 00:32:53 2010
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.log4j.Logger;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.cache.ICacheExpungeHook;
@@ -110,7 +111,7 @@ class ConsistencyChecker implements Runn
 
                 if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
                 {
-                    IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_, replicas_.size());
+                    ReadResponseResolver readResponseResolver = new ReadResponseResolver(table_, replicas_.size());
                     IAsyncCallback responseHandler;
                     if (replicas_.contains(FBUtilities.getLocalAddress()))
                         responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver);
@@ -141,33 +142,32 @@ class ConsistencyChecker implements Runn
 	static class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
 	{
 		private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
-		private final IResponseResolver<Row> readResponseResolver_;
+		private final ReadResponseResolver readResponseResolver_;
 		private final int majority_;
 		
-		DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver)
+		DataRepairHandler(int responseCount, ReadResponseResolver readResponseResolver)
 		{
 			readResponseResolver_ = readResponseResolver;
 			majority_ = (responseCount / 2) + 1;  
 		}
 
-        public DataRepairHandler(Row localRow, int responseCount, IResponseResolver<Row> readResponseResolver) throws IOException
+        public DataRepairHandler(Row localRow, int responseCount, ReadResponseResolver readResponseResolver) throws IOException
         {
             this(responseCount, readResponseResolver);
             // wrap localRow in a response Message so it doesn't need to be special-cased in the resolver
             ReadResponse readResponse = new ReadResponse(localRow);
-            DataOutputBuffer out = new DataOutputBuffer();
-            ReadResponse.serializer().serialize(readResponse, out);
-            byte[] bytes = new byte[out.getLength()];
-            System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
-            responses_.add(new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes));
+            Message fakeMessage = new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
+            responses_.add(fakeMessage);
+            readResponseResolver_.injectPreProcessed(fakeMessage, readResponse);
         }
 
         // synchronized so the " == majority" is safe
 		public synchronized void response(Message message)
 		{
 			if (logger_.isDebugEnabled())
-			  logger_.debug("Received responses in DataRepairHandler : " + message.toString());
+			  logger_.debug("Received response in DataRepairHandler : " + message.toString());
 			responses_.add(message);
+            readResponseResolver_.preprocess(message);
             if (responses_.size() == majority_)
             {
                 String messageId = message.getMessageId();

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java Thu Nov 11 00:32:53 2010
@@ -201,6 +201,12 @@ public class ReadResponseResolver implem
         }
     }
 
+    /** hack so ConsistencyChecker doesn't have to serialize/deserialize an extra real Message */
+    public void injectPreProcessed(Message message, ReadResponse result)
+    {
+        results.put(message, result);
+    }
+
     public boolean isDataPresent(Collection<Message> responses)
 	{
         int digests = 0;