You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/11/11 01:32:54 UTC
svn commit: r1033785 - in /cassandra/branches/cassandra-0.6: CHANGES.txt
src/java/org/apache/cassandra/service/ConsistencyChecker.java
src/java/org/apache/cassandra/service/ReadResponseResolver.java
Author: jbellis
Date: Thu Nov 11 00:32:53 2010
New Revision: 1033785
URL: http://svn.apache.org/viewvc?rev=1033785&view=rev
Log:
fix read repair regression from 0.6.7
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1727
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Nov 11 00:32:53 2010
@@ -1,6 +1,7 @@
-0.6.7
+dev
* Update windows .bat files to work outside of main Cassandra
directory (CASSANDRA-1713)
+ * fix read repair regression from 0.6.7 (CASSANDRA-1727)
0.6.7
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Nov 11 00:32:53 2010
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.cache.ICacheExpungeHook;
@@ -110,7 +111,7 @@ class ConsistencyChecker implements Runn
if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
{
- IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_, replicas_.size());
+ ReadResponseResolver readResponseResolver = new ReadResponseResolver(table_, replicas_.size());
IAsyncCallback responseHandler;
if (replicas_.contains(FBUtilities.getLocalAddress()))
responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver);
@@ -141,33 +142,32 @@ class ConsistencyChecker implements Runn
static class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
{
private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
- private final IResponseResolver<Row> readResponseResolver_;
+ private final ReadResponseResolver readResponseResolver_;
private final int majority_;
- DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver)
+ DataRepairHandler(int responseCount, ReadResponseResolver readResponseResolver)
{
readResponseResolver_ = readResponseResolver;
majority_ = (responseCount / 2) + 1;
}
- public DataRepairHandler(Row localRow, int responseCount, IResponseResolver<Row> readResponseResolver) throws IOException
+ public DataRepairHandler(Row localRow, int responseCount, ReadResponseResolver readResponseResolver) throws IOException
{
this(responseCount, readResponseResolver);
// wrap localRow in a response Message so it doesn't need to be special-cased in the resolver
ReadResponse readResponse = new ReadResponse(localRow);
- DataOutputBuffer out = new DataOutputBuffer();
- ReadResponse.serializer().serialize(readResponse, out);
- byte[] bytes = new byte[out.getLength()];
- System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
- responses_.add(new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes));
+ Message fakeMessage = new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
+ responses_.add(fakeMessage);
+ readResponseResolver_.injectPreProcessed(fakeMessage, readResponse);
}
// synchronized so the " == majority" is safe
public synchronized void response(Message message)
{
if (logger_.isDebugEnabled())
- logger_.debug("Received responses in DataRepairHandler : " + message.toString());
+ logger_.debug("Received response in DataRepairHandler : " + message.toString());
responses_.add(message);
+ readResponseResolver_.preprocess(message);
if (responses_.size() == majority_)
{
String messageId = message.getMessageId();
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java Thu Nov 11 00:32:53 2010
@@ -201,6 +201,12 @@ public class ReadResponseResolver implem
}
}
+ /** hack so ConsistencyChecker doesn't have to serialize/deserialize an extra real Message */
+ public void injectPreProcessed(Message message, ReadResponse result)
+ {
+ results.put(message, result);
+ }
+
public boolean isDataPresent(Collection<Message> responses)
{
int digests = 0;