You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/15 18:16:53 UTC

svn commit: r1070977 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ tes...

Author: jbellis
Date: Tue Feb 15 17:16:52 2011
New Revision: 1070977

URL: http://svn.apache.org/viewvc?rev=1070977&view=rev
Log:
no more waiting for RPC_TIMEOUT before finishing RR
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2069

Added:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IReadCommand.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowDigestResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowRepairResolver.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RowResolverTest.java
Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Feb 15 17:16:52 2011
@@ -2,6 +2,7 @@
  * copy DecoratedKey.key when inserting into caches to avoid retaining
    a reference to the underlying buffer (CASSANDRA-2102)
  * format subcolumn names with subcomparator (CASSANDRA-2136)
+ * lower-latency read repair (CASSANDRA-2069)
 
 
 0.7.1

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java Tue Feb 15 17:16:52 2011
@@ -31,7 +31,8 @@ public enum Stage
     ANTI_ENTROPY,
     MIGRATION,
     MISC,
-    INTERNAL_RESPONSE;
+    INTERNAL_RESPONSE,
+    READ_REPAIR;
 
     public String getJmxType()
     {
@@ -47,6 +48,7 @@ public enum Stage
             case MUTATION:
             case READ:
             case REQUEST_RESPONSE:
+            case READ_REPAIR:
                 return "request";
             default:
                 throw new AssertionError("Unknown stage " + this);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java Tue Feb 15 17:16:52 2011
@@ -50,6 +50,7 @@ public class StageManager
         stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
         stages.put(Stage.MIGRATION, new JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
         stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
+        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Runtime.getRuntime().availableProcessors()));
     }
 
     private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RangeSliceCommand.java Tue Feb 15 17:16:52 2011
@@ -47,6 +47,7 @@ import org.apache.cassandra.dht.Abstract
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.SlicePredicate;
@@ -56,7 +57,7 @@ import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TSerializer;
 import org.apache.cassandra.thrift.TBinaryProtocol;
 
-public class RangeSliceCommand
+public class RangeSliceCommand implements IReadCommand
 {
     private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
     
@@ -113,6 +114,11 @@ public class RangeSliceCommand
         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
         return serializer.deserialize(new DataInputStream(bis));
     }
+
+    public String getKeyspace()
+    {
+        return keyspace;
+    }
 }
 
 class RangeSliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadCommand.java Tue Feb 15 17:16:52 2011
@@ -30,11 +30,12 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 
-public abstract class ReadCommand
+public abstract class ReadCommand implements IReadCommand
 {
     public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
     public static final byte CMD_TYPE_GET_SLICE = 2;
@@ -91,6 +92,11 @@ public abstract class ReadCommand
     {
         return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
     }
+
+    public String getKeyspace()
+    {
+        return table;
+    }
 }
 
 class ReadCommandSerializer implements ICompactSerializer<ReadCommand>

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Feb 15 17:16:52 2011
@@ -282,5 +282,10 @@ public class BootStrapper
             token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), Charsets.UTF_8));
             condition.signalAll();
         }
+
+        public boolean isLatencyForSnitch()
+        {
+            return false;
+        }
     }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java Tue Feb 15 17:16:52 2011
@@ -96,6 +96,11 @@ class AsyncResult implements IAsyncResul
         }        
     }
 
+    public boolean isLatencyForSnitch()
+    {
+        return false;
+    }
+
     public InetAddress getFrom()
     {
         return from;

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java Tue Feb 15 17:16:52 2011
@@ -23,4 +23,9 @@ package org.apache.cassandra.net;
 
 public interface IMessageCallback
 {
+    /**
+     * @return true if this callback is on the read path and its latency should be
+     * given as input to the dynamic snitch.
+     */
+    public boolean isLatencyForSnitch();
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Tue Feb 15 17:16:52 2011
@@ -138,7 +138,7 @@ public final class MessagingService impl
      */
     public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency)
     {
-        if (cb instanceof ReadCallback || cb instanceof AsyncResult)
+        if (cb.isLatencyForSnitch())
             addLatency(address, latency);
     }
 

Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,70 @@
+package org.apache.cassandra.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+public abstract class AbstractRowResolver implements IResponseResolver<Row>
+{
+    protected static Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class);
+
+    private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
+
+    protected final String table;
+    protected final ConcurrentMap<Message, ReadResponse> replies = new NonBlockingHashMap<Message, ReadResponse>();
+    protected final DecoratedKey key;
+
+    public AbstractRowResolver(ByteBuffer key, String table)
+    {
+        this.key = StorageService.getPartitioner().decorateKey(key);
+        this.table = table;
+    }
+
+    public void preprocess(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        try
+        {
+            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+            if (logger.isDebugEnabled())
+                logger.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
+            replies.put(message, result);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    /** hack so local reads don't force de/serialization of an extra real Message */
+    public void injectPreProcessed(ReadResponse result)
+    {
+        assert replies.get(FAKE_MESSAGE) == null; // should only be one local reply
+        replies.put(FAKE_MESSAGE, result);
+    }
+
+    public Iterable<Message> getMessages()
+    {
+        return replies.keySet();
+    }
+
+    public int getMessageCount()
+    {
+        return replies.size();
+    }
+}

Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,41 @@
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class AsyncRepairCallback implements IAsyncCallback
+{
+    private final RowRepairResolver repairResolver;
+    private final int count;
+
+    public AsyncRepairCallback(RowRepairResolver repairResolver, int count)
+    {
+        this.repairResolver = repairResolver;
+        this.count = count;
+    }
+
+    public void response(Message message)
+    {
+        repairResolver.preprocess(message);
+        if (repairResolver.getMessageCount() == count)
+        {
+            StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
+            {
+                protected void runMayThrow() throws DigestMismatchException, IOException
+                {
+                    repairResolver.resolve();
+                }
+            });
+        }
+    }
+
+    public boolean isLatencyForSnitch()
+    {
+        return true;
+    }
+}

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Tue Feb 15 17:16:52 2011
@@ -22,12 +22,12 @@ package org.apache.cassandra.service;
 
 
 import java.net.InetAddress;
-import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.Message;
@@ -44,12 +44,12 @@ public class DatacenterReadCallback<T> e
     private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
     private AtomicInteger localResponses;
     
-    public DatacenterReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
+    public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
     {
-        super(resolver, consistencyLevel, table);
+        super(resolver, consistencyLevel, command, endpoints);
         localResponses = new AtomicInteger(blockfor);
     }
-    
+
     @Override
     public void response(Message message)
     {
@@ -68,14 +68,15 @@ public class DatacenterReadCallback<T> e
     @Override
     public void response(ReadResponse result)
     {
-        ((ReadResponseResolver) resolver).injectPreProcessed(result);
+        ((RowDigestResolver) resolver).injectPreProcessed(result);
 
         int n = localResponses.decrementAndGet();
-
         if (n == 0 && resolver.isDataPresent())
         {
             condition.signal();
         }
+
+        maybeResolveForRepair();
     }
     
     @Override
@@ -86,7 +87,7 @@ public class DatacenterReadCallback<T> e
 	}
 
     @Override
-    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
+    public void assureSufficientLiveNodes() throws UnavailableException
     {
         int localEndpoints = 0;
         for (InetAddress endpoint : endpoints)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Tue Feb 15 17:16:52 2011
@@ -115,4 +115,9 @@ public class DatacenterSyncWriteResponse
                 throw new UnavailableException();
         }
     }
+
+    public boolean isLatencyForSnitch()
+    {
+        return false;
+    }
 }

Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IReadCommand.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IReadCommand.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IReadCommand.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,6 @@
+package org.apache.cassandra.service;
+
+public interface IReadCommand
+{
+    public String getKeyspace();
+}

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Tue Feb 15 17:16:52 2011
@@ -49,7 +49,6 @@ public class RangeSliceResponseResolver 
 
     public RangeSliceResponseResolver(String table, List<InetAddress> sources)
     {
-        assert sources.size() > 0;
         this.sources = sources;
         this.table = table;
     }
@@ -103,8 +102,8 @@ public class RangeSliceResponseResolver 
 
             protected Row getReduced()
             {
-                ColumnFamily resolved = ReadResponseResolver.resolveSuperset(versions);
-                ReadResponseResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
+                ColumnFamily resolved = RowRepairResolver.resolveSuperset(versions);
+                RowRepairResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
                 versions.clear();
                 versionSources.clear();
                 return new Row(key, resolved);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java Tue Feb 15 17:16:52 2011
@@ -20,14 +20,20 @@ package org.apache.cassandra.service;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.Collection;
+import java.util.List;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -36,28 +42,61 @@ import org.apache.cassandra.net.Messagin
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.WrappedRunnable;
 
 public class ReadCallback<T> implements IAsyncCallback
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
+    private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
+    {
+        @Override
+        protected Random initialValue()
+        {
+            return new Random();
+        }
+    };
+
     public final IResponseResolver<T> resolver;
     protected final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
     protected final int blockfor;
-    
+    final List<InetAddress> endpoints;
+    private final IReadCommand command;
+
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
+    public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
     {
-        this.blockfor = determineBlockFor(consistencyLevel, table);
+        this.command = command;
+        this.blockfor = determineBlockFor(consistencyLevel, command.getKeyspace());
         this.resolver = resolver;
         this.startTime = System.currentTimeMillis();
-
-        logger.debug("ReadCallback blocking for {} responses", blockfor);
+        boolean repair = randomlyReadRepair();
+        this.endpoints = repair || resolver instanceof RowRepairResolver
+                       ? endpoints
+                       : endpoints.subList(0, Math.min(endpoints.size(), blockfor)); // min so as to not throw exception until assureSufficient is called
+
+        if (logger.isDebugEnabled())
+            logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s",
+                                       blockfor, repair, StringUtils.join(this.endpoints, ",")));
     }
     
+    private boolean randomlyReadRepair()
+    {
+        if (resolver instanceof RowDigestResolver)
+        {
+            assert command instanceof ReadCommand : command;
+            String table = ((RowDigestResolver) resolver).table;
+            String columnFamily = ((ReadCommand) command).getColumnFamilyName();
+            CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(table).get(columnFamily);
+            return cfmd.getReadRepairChance() > random.get().nextDouble();
+        }
+        // we don't read repair on range scans
+        return false;
+    }
+
     public T get() throws TimeoutException, DigestMismatchException, IOException
     {
         long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -85,21 +124,42 @@ public class ReadCallback<T> implements 
     public void response(Message message)
     {
         resolver.preprocess(message);
+        assert resolver.getMessageCount() <= endpoints.size();
         if (resolver.getMessageCount() < blockfor)
             return;
         if (resolver.isDataPresent())
+        {
             condition.signal();
+            maybeResolveForRepair();
+        }
     }
 
     public void response(ReadResponse result)
     {
-        ((ReadResponseResolver) resolver).injectPreProcessed(result);
+        ((RowDigestResolver) resolver).injectPreProcessed(result);
+        assert resolver.getMessageCount() <= endpoints.size();
         if (resolver.getMessageCount() < blockfor)
             return;
         if (resolver.isDataPresent())
+        {
             condition.signal();
+            maybeResolveForRepair();
+        }
     }
-    
+
+    /**
+     * Check digests in the background on the Repair stage if we've received replies
+     * too all the requests we sent.
+     */
+    protected void maybeResolveForRepair()
+    {
+        if (blockfor < endpoints.size() && resolver.getMessageCount() == endpoints.size())
+        {
+            assert resolver.isDataPresent();
+            StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner());
+        }
+    }
+
     public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
     {
         switch (consistencyLevel)
@@ -116,9 +176,38 @@ public class ReadCallback<T> implements 
         }
     }
 
-    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
+    public void assureSufficientLiveNodes() throws UnavailableException
     {
         if (endpoints.size() < blockfor)
             throw new UnavailableException();
     }
+
+    public boolean isLatencyForSnitch()
+    {
+        return true;
+    }
+
+    private class AsyncRepairRunner extends WrappedRunnable
+    {
+        protected void runMayThrow() throws IOException
+        {
+            try
+            {
+                resolver.resolve();
+            }
+            catch (DigestMismatchException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Digest mismatch:", e);
+
+                ReadCommand readCommand = (ReadCommand) command;
+                final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key);
+                IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
+
+                Message messageRepair = readCommand.makeReadMessage();
+                for (InetAddress endpoint : endpoints)
+                    MessagingService.instance().sendRR(messageRepair, endpoint, repairHandler);
+            }
+        }
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java Tue Feb 15 17:16:52 2011
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOError;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-/**
- * Turns ReadResponse messages into Row objects, resolving to the most recent
- * version and setting up read repairs as necessary.
- */
-public class ReadResponseResolver implements IResponseResolver<Row>
-{
-	private static Logger logger_ = LoggerFactory.getLogger(ReadResponseResolver.class);
-    private final String table;
-    private final ConcurrentMap<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
-    private DecoratedKey key;
-    private ByteBuffer digest;
-    private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);;
-
-    public ReadResponseResolver(String table, ByteBuffer key)
-    {
-        this.table = table;
-        this.key = StorageService.getPartitioner().decorateKey(key);
-    }
-    
-    public Row getData() throws IOException
-    {
-        for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
-        {
-            ReadResponse result = entry.getValue();
-            if (!result.isDigestQuery())
-                return result.row();
-        }
-
-        throw new AssertionError("getData should not be invoked when no data is present");
-    }
-
-    /*
-     * This method handles three different scenarios:
-     *
-     * 1a)we're handling the initial read, of data from the closest replica + digests
-     *    from the rest.  In this case we check the digests against each other,
-     *    throw an exception if there is a mismatch, otherwise return the data row.
-     *
-     * 1b)we're checking additional digests that arrived after the minimum to handle
-     *    the requested ConsistencyLevel, i.e. asynchronouse read repair check
-     *
-     * 2) there was a mismatch on the initial read (1a or 1b), so we redid the digest requests
-     *    as full data reads.  In this case we need to compute the most recent version
-     *    of each column, and send diffs to out-of-date replicas.
-     */
-    public Row resolve() throws DigestMismatchException, IOException
-    {
-        if (logger_.isDebugEnabled())
-            logger_.debug("resolving " + results.size() + " responses");
-
-        long startTime = System.currentTimeMillis();
-		List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
-		List<InetAddress> endpoints = new ArrayList<InetAddress>();
-
-        // case 1: validate digests against each other; throw immediately on mismatch.
-        // also, collects data results into versions/endpoints lists.
-        //
-        // results are cleared as we process them, to avoid unnecessary duplication of work
-        // when resolve() is called a second time for read repair on responses that were not
-        // necessary to satisfy ConsistencyLevel.
-        for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
-        {
-            ReadResponse result = entry.getValue();
-            Message message = entry.getKey();
-            if (result.isDigestQuery())
-            {
-                if (digest == null)
-                {
-                    digest = result.digest();
-                }
-                else
-                {
-                    ByteBuffer digest2 = result.digest();
-                    if (!digest.equals(digest2))
-                        throw new DigestMismatchException(key, digest, digest2);
-                }
-            }
-            else
-            {
-                versions.add(result.row().cf);
-                endpoints.add(message.getFrom());
-            }
-
-            results.remove(message);
-        }
-
-		// If there was a digest query compare it with all the data digests
-		// If there is a mismatch then throw an exception so that read repair can happen.
-        //
-        // It's important to note that we do not compare the digests of multiple data responses --
-        // if we are in that situation we know there was a previous mismatch and now we're doing a repair,
-        // so our job is now case 2: figure out what the most recent version is and update everyone to that version.
-        if (digest != null)
-        {
-            for (ColumnFamily cf : versions)
-            {
-                ByteBuffer digest2 = ColumnFamily.digest(cf);
-                if (!digest.equals(digest2))
-                    throw new DigestMismatchException(key, digest, digest2);
-            }
-            if (logger_.isDebugEnabled())
-                logger_.debug("digests verified");
-        }
-
-        ColumnFamily resolved;
-        if (versions.size() > 1)
-        {
-            resolved = resolveSuperset(versions);
-            if (logger_.isDebugEnabled())
-                logger_.debug("versions merged");
-            maybeScheduleRepairs(resolved, table, key, versions, endpoints);
-        }
-        else
-        {
-            resolved = versions.get(0);
-        }
-
-        if (logger_.isDebugEnabled())
-            logger_.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
-		return new Row(key, resolved);
-	}
-
-    /**
-     * For each row version, compare with resolved (the superset of all row versions);
-     * if it is missing anything, send a mutation to the endpoint it come from.
-     */
-    public static void maybeScheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
-    {
-        for (int i = 0; i < versions.size(); i++)
-        {
-            ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
-            if (diffCf == null) // no repair needs to happen
-                continue;
-
-            // create and send the row mutation message based on the diff
-            RowMutation rowMutation = new RowMutation(table, key.key);
-            rowMutation.add(diffCf);
-            Message repairMessage;
-            try
-            {
-                repairMessage = rowMutation.makeRowMutationMessage(StorageService.Verb.READ_REPAIR);
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-            MessagingService.instance().sendOneWay(repairMessage, endpoints.get(i));
-        }
-    }
-
-    static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
-    {
-        assert versions.size() > 0;
-
-        ColumnFamily resolved = null;
-        for (ColumnFamily cf : versions)
-        {
-            if (cf != null)
-            {
-                resolved = cf.cloneMe();
-                break;
-            }
-        }
-        if (resolved == null)
-            return null;
-
-        for (ColumnFamily cf : versions)
-            resolved.resolve(cf);
-
-        return resolved;
-    }
-
-    public void preprocess(Message message)
-    {
-        byte[] body = message.getMessageBody();
-        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-        try
-        {
-            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-            if (logger_.isDebugEnabled())
-                logger_.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
-            results.put(message, result);
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-    }
-
-    /** hack so local reads don't force de/serialization of an extra real Message */
-    public void injectPreProcessed(ReadResponse result)
-    {
-        assert results.get(FAKE_MESSAGE) == null; // should only be one local reply
-        results.put(FAKE_MESSAGE, result);
-    }
-
-    public boolean isDataPresent()
-	{
-        for (ReadResponse result : results.values())
-        {
-            if (!result.isDigestQuery())
-                return true;
-        }
-        return false;
-    }
-
-    public Iterable<Message> getMessages()
-    {
-        return results.keySet();
-    }
-
-    public int getMessageCount()
-    {
-        return results.size();
-    }
-}

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java Tue Feb 15 17:16:52 2011
@@ -39,6 +39,14 @@ public class RepairCallback<T> implement
     private final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
 
+    /**
+     * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
+     * it needs to achieve.  Repair on the other hand is happy to repair whoever replies within the timeout.
+     *
+     * (The other main difference of course is, this is only created once we know we have a digest
+     * mismatch, and we're going to do full-data reads from everyone -- that is, this is the final
+     * stage in the read process.)
+     */
     public RepairCallback(IResponseResolver<T> resolver, List<InetAddress> endpoints)
     {
         this.resolver = resolver;
@@ -46,10 +54,6 @@ public class RepairCallback<T> implement
         this.startTime = System.currentTimeMillis();
     }
 
-    /**
-     * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
-     * it needs to achieve.  Repair on the other hand is happy to repair whoever replies within the timeout.
-     */
     public T get() throws TimeoutException, DigestMismatchException, IOException
     {
         long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -71,4 +75,9 @@ public class RepairCallback<T> implement
         if (resolver.getMessageCount() == endpoints.size())
             condition.signal();
     }
+
+    public boolean isLatencyForSnitch()
+    {
+        return true;
+    }
 }

Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowDigestResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowDigestResolver.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowDigestResolver.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowDigestResolver.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.Message;
+
+public class RowDigestResolver extends AbstractRowResolver
+{
+    public RowDigestResolver(String table, ByteBuffer key)
+    {
+        super(key, table);
+    }
+    
+    public Row getData() throws IOException
+    {
+        for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+        {
+            ReadResponse result = entry.getValue();
+            if (!result.isDigestQuery())
+                return result.row();
+        }
+
+        throw new AssertionError("getData should not be invoked when no data is present");
+    }
+
+    /*
+     * This method handles two different scenarios:
+     *
+     * 1a)we're handling the initial read, of data from the closest replica + digests
+     *    from the rest.  In this case we check the digests against each other,
+     *    throw an exception if there is a mismatch, otherwise return the data row.
+     *
+     * 1b)we're checking additional digests that arrived after the minimum to handle
+     *    the requested ConsistencyLevel, i.e. asynchronouse read repair check
+     */
+    public Row resolve() throws DigestMismatchException, IOException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("resolving " + replies.size() + " responses");
+
+        long startTime = System.currentTimeMillis();
+		ColumnFamily data = null;
+
+        // case 1: validate digests against each other; throw immediately on mismatch.
+        // also, collects data results into versions/endpoints lists.
+        //
+        // results are cleared as we process them, to avoid unnecessary duplication of work
+        // when resolve() is called a second time for read repair on responses that were not
+        // necessary to satisfy ConsistencyLevel.
+        ByteBuffer digest = null;
+        for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+        {
+            ReadResponse response = entry.getValue();
+            if (response.isDigestQuery())
+            {
+                if (digest == null)
+                {
+                    digest = response.digest();
+                }
+                else
+                {
+                    ByteBuffer digest2 = response.digest();
+                    if (!digest.equals(digest2))
+                        throw new DigestMismatchException(key, digest, digest2);
+                }
+            }
+            else
+            {
+                data = response.row().cf;
+            }
+        }
+
+		// If there was a digest query compare it with all the data digests
+		// If there is a mismatch then throw an exception so that read repair can happen.
+        //
+        // It's important to note that we do not compare the digests of multiple data responses --
+        // if we are in that situation we know there was a previous mismatch and now we're doing a repair,
+        // so our job is now case 2: figure out what the most recent version is and update everyone to that version.
+        if (digest != null)
+        {
+            ByteBuffer digest2 = ColumnFamily.digest(data);
+            if (!digest.equals(digest2))
+                throw new DigestMismatchException(key, digest, digest2);
+            if (logger.isDebugEnabled())
+                logger.debug("digests verified");
+        }
+
+        if (logger.isDebugEnabled())
+            logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+		return new Row(key, data);
+	}
+
+    public boolean isDataPresent()
+	{
+        for (ReadResponse result : replies.values())
+        {
+            if (!result.isDigestQuery())
+                return true;
+        }
+        return false;
+    }
+}

Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowRepairResolver.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowRepairResolver.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+
+public class RowRepairResolver extends AbstractRowResolver
+{
+    public RowRepairResolver(String table, ByteBuffer key)
+    {
+        super(key, table);
+    }
+
+    /*
+    * This method handles the following scenario:
+    *
+    * there was a mismatch on the initial read (1a or 1b), so we redid the digest requests
+    * as full data reads.  In this case we need to compute the most recent version
+    * of each column, and send diffs to out-of-date replicas.
+    */
+    public Row resolve() throws DigestMismatchException, IOException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("resolving " + replies.size() + " responses");
+
+        long startTime = System.currentTimeMillis();
+		List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
+		List<InetAddress> endpoints = new ArrayList<InetAddress>();
+
+        // case 1: validate digests against each other; throw immediately on mismatch.
+        // also, collects data results into versions/endpoints lists.
+        //
+        // results are cleared as we process them, to avoid unnecessary duplication of work
+        // when resolve() is called a second time for read repair on responses that were not
+        // necessary to satisfy ConsistencyLevel.
+        for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+        {
+            Message message = entry.getKey();
+            ReadResponse response = entry.getValue();
+            assert !response.isDigestQuery();
+            versions.add(response.row().cf);
+            endpoints.add(message.getFrom());
+        }
+
+        ColumnFamily resolved;
+        if (versions.size() > 1)
+        {
+            resolved = resolveSuperset(versions);
+            if (logger.isDebugEnabled())
+                logger.debug("versions merged");
+            maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+        }
+        else
+        {
+            resolved = versions.get(0);
+        }
+
+        if (logger.isDebugEnabled())
+            logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+		return new Row(key, resolved);
+	}
+
+    /**
+     * For each row version, compare with resolved (the superset of all row versions);
+     * if it is missing anything, send a mutation to the endpoint it come from.
+     */
+    public static void maybeScheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
+    {
+        for (int i = 0; i < versions.size(); i++)
+        {
+            ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
+            if (diffCf == null) // no repair needs to happen
+                continue;
+
+            // create and send the row mutation message based on the diff
+            RowMutation rowMutation = new RowMutation(table, key.key);
+            rowMutation.add(diffCf);
+            Message repairMessage;
+            try
+            {
+                repairMessage = rowMutation.makeRowMutationMessage(StorageService.Verb.READ_REPAIR);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            MessagingService.instance().sendOneWay(repairMessage, endpoints.get(i));
+        }
+    }
+
+    static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
+    {
+        assert versions.size() > 0;
+
+        ColumnFamily resolved = null;
+        for (ColumnFamily cf : versions)
+        {
+            if (cf != null)
+            {
+                resolved = cf.cloneMe();
+                break;
+            }
+        }
+        if (resolved == null)
+            return null;
+
+        for (ColumnFamily cf : versions)
+            resolved.resolve(cf);
+
+        return resolved;
+    }
+
+    public Row getData() throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isDataPresent()
+	{
+        throw new UnsupportedOperationException();
+    }
+}

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Tue Feb 15 17:16:52 2011
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -59,17 +58,6 @@ public class StorageProxy implements Sto
 {
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
 
-    private static ScheduledExecutorService repairExecutor = new ScheduledThreadPoolExecutor(1); // TODO JMX-enable this
-
-    private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
-    {
-        @Override
-        protected Random initialValue()
-        {
-            return new Random();
-        }
-    };
-
     // mbean stuff
     private static final LatencyTracker readStats = new LatencyTracker();
     private static final LatencyTracker rangeStats = new LatencyTracker();
@@ -78,6 +66,8 @@ public class StorageProxy implements Sto
     private static int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
     private static final String UNREACHABLE = "UNREACHABLE";
 
+    public static final StorageProxy instance = new StorageProxy();
+
     private StorageProxy() {}
     static
     {
@@ -323,66 +313,55 @@ public class StorageProxy implements Sto
     private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
     {
         List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>();
-        List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>();
         List<Row> rows = new ArrayList<Row>();
-        Set<ReadCommand> repairs = new HashSet<ReadCommand>();
 
         // send out read requests
         for (ReadCommand command: commands)
         {
             assert !command.isDigestQuery();
+            logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
             List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
             DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
 
-            ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
-            ReadCallback<Row> handler = getReadCallback(resolver, command.table, consistency_level);
-            handler.assureSufficientLiveNodes(endpoints);
+            RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
+            ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
+            handler.assureSufficientLiveNodes();
+            assert !handler.endpoints.isEmpty();
 
-            // if we're not going to read repair, cut the endpoints list down to the ones required to satisfy ConsistencyLevel
-            if (randomlyReadRepair(command))
-            {
-                if (endpoints.size() > handler.blockfor)
-                    repairs.add(command);
-            }
-            else
-            {
-                endpoints = endpoints.subList(0, handler.blockfor);
-            }
-            
             // The data-request message is sent to dataPoint, the node that will actually get
             // the data for us. The other replicas are only sent a digest query.
             ReadCommand digestCommand = null;
-            if (endpoints.size() > 1)
+            if (handler.endpoints.size() > 1)
             {
                 digestCommand = command.copy();
                 digestCommand.setDigestQuery(true);
             }
 
-            InetAddress dataPoint = endpoints.get(0);
+            InetAddress dataPoint = handler.endpoints.get(0);
             if (dataPoint.equals(FBUtilities.getLocalAddress()))
             {
                 if (logger.isDebugEnabled())
-                    logger.debug("reading data for " + command + " locally");
+                    logger.debug("reading data locally");
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
             }
             else
             {
                 Message message = command.makeReadMessage();
                 if (logger.isDebugEnabled())
-                    logger.debug("reading data for " + command + " from " + dataPoint);
+                    logger.debug("reading data from " + dataPoint);
                 MessagingService.instance().sendRR(message, dataPoint, handler);
             }
 
             // We lazy-construct the digest Message object since it may not be necessary if we
             // are doing a local digest read, or no digest reads at all.
             Message digestMessage = null;
-            for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
+            for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
             {
                 if (digestPoint.equals(FBUtilities.getLocalAddress()))
                 {
                     if (logger.isDebugEnabled())
-                        logger.debug("reading digest for " + command + " locally");
+                        logger.debug("reading digest locally");
                     StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
                 }
                 else
@@ -390,44 +369,45 @@ public class StorageProxy implements Sto
                     if (digestMessage == null)
                         digestMessage = digestCommand.makeReadMessage();
                     if (logger.isDebugEnabled())
-                        logger.debug("reading digest for " + command + " from " + digestPoint);
+                        logger.debug("reading digest for from " + digestPoint);
                     MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
                 }
             }
 
             readCallbacks.add(handler);
-            commandEndpoints.add(endpoints);
         }
 
         // read results and make a second pass for any digest mismatches
         List<RepairCallback<Row>> repairResponseHandlers = null;
         for (int i = 0; i < commands.size(); i++)
         {
-            ReadCallback<Row> readCallback = readCallbacks.get(i);
+            ReadCallback<Row> handler = readCallbacks.get(i);
             Row row;
             ReadCommand command = commands.get(i);
-            List<InetAddress> endpoints = commandEndpoints.get(i);
             try
             {
                 long startTime2 = System.currentTimeMillis();
-                row = readCallback.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
+                row = handler.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
                 if (row != null)
                     rows.add(row);
 
                 if (logger.isDebugEnabled())
                     logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms.");
-
-                if (repairs.contains(command))
-                    repairExecutor.schedule(new RepairRunner(readCallback.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             }
             catch (DigestMismatchException ex)
             {
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", ex);
-                RepairCallback<Row> handler = repair(command, endpoints);
+
+                RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
+                RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver, handler.endpoints);
+                Message messageRepair = command.makeReadMessage();
+                for (InetAddress endpoint : handler.endpoints)
+                    MessagingService.instance().sendRR(messageRepair, endpoint, repairHandler);
+
                 if (repairResponseHandlers == null)
                     repairResponseHandlers = new ArrayList<RepairCallback<Row>>();
-                repairResponseHandlers.add(handler);
+                repairResponseHandlers.add(repairHandler);
             }
         }
 
@@ -476,24 +456,13 @@ public class StorageProxy implements Sto
         }
     }
     
-    static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, String table, ConsistencyLevel consistencyLevel)
+    static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
     {
         if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
         {
-            return new DatacenterReadCallback(resolver, consistencyLevel, table);
+            return new DatacenterReadCallback(resolver, consistencyLevel, command, endpoints);
         }
-        return new ReadCallback(resolver, consistencyLevel, table);
-    }
-
-    private static RepairCallback<Row> repair(ReadCommand command, List<InetAddress> endpoints)
-    throws IOException
-    {
-        ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
-        RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
-        Message messageRepair = command.makeReadMessage();
-        for (InetAddress endpoint : endpoints)
-            MessagingService.instance().sendRR(messageRepair, endpoint, handler);
-        return handler;
+        return new ReadCallback(resolver, consistencyLevel, command, endpoints);
     }
 
     /*
@@ -545,16 +514,14 @@ public class StorageProxy implements Sto
 
                     // collect replies and resolve according to consistency level
                     RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
-                    AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy();
-                    ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
-                    // TODO bail early if live endpoints can't satisfy requested consistency level
+                    ReadCallback<List<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
+                    handler.assureSufficientLiveNodes();
                     for (InetAddress endpoint : liveEndpoints)
                     {
                         MessagingService.instance().sendRR(message, endpoint, handler);
                         if (logger.isDebugEnabled())
                             logger.debug("reading " + c2 + " from " + endpoint);
                     }
-                    // TODO read repair on remaining replicas?
 
                     // if we're done, great, otherwise, move to the next range
                     try 
@@ -607,6 +574,11 @@ public class StorageProxy implements Sto
                 versions.put(message.getFrom(), theirVersion);
                 latch.countDown();
             }
+
+            public boolean isLatencyForSnitch()
+            {
+                return false;
+            }
         };
         // an empty message acts as a request to the SchemaCheckVerbHandler.
         for (InetAddress endpoint : liveHosts)
@@ -699,12 +671,6 @@ public class StorageProxy implements Sto
 
         return ranges;
     }
-    
-    private static boolean randomlyReadRepair(ReadCommand command)
-    {
-        CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(command.table).get(command.getColumnFamilyName());
-        return cfmd.getReadRepairChance() > random.get().nextDouble();
-    }
 
     public long getReadOperations()
     {
@@ -781,7 +747,7 @@ public class StorageProxy implements Sto
         return writeStats.getRecentLatencyHistogramMicros();
     }
 
-    public static List<Row> scan(String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
+    public static List<Row> scan(final String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
     throws IOException, TimeoutException, UnavailableException
     {
         IPartitioner p = StorageService.getPartitioner();
@@ -799,12 +765,16 @@ public class StorageProxy implements Sto
 
             // collect replies and resolve according to consistency level
             RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints);
-            ReadCallback<List<Row>> handler = getReadCallback(resolver, keyspace, consistency_level);
-            
-            // bail early if live endpoints can't satisfy requested consistency level
-            if(handler.blockfor > liveEndpoints.size())
-                throw new UnavailableException();
-            
+            IReadCommand iCommand = new IReadCommand()
+            {
+                public String getKeyspace()
+                {
+                    return keyspace;
+                }
+            };
+            ReadCallback<List<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints);
+            handler.assureSufficientLiveNodes();
+
             IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
             Message message = command.getMessage();
             for (InetAddress endpoint : liveEndpoints)
@@ -912,40 +882,4 @@ public class StorageProxy implements Sto
     {
         return !Gossiper.instance.getUnreachableMembers().isEmpty();
     }
-
-    private static class RepairRunner extends WrappedRunnable
-    {
-        private final IResponseResolver<Row> resolver;
-        private final ReadCommand command;
-        private final List<InetAddress> endpoints;
-
-        public RepairRunner(IResponseResolver<Row> resolver, ReadCommand command, List<InetAddress> endpoints)
-        {
-            this.resolver = resolver;
-            this.command = command;
-            this.endpoints = endpoints;
-        }
-
-        protected void runMayThrow() throws IOException
-        {
-            try
-            {
-                resolver.resolve();
-            }
-            catch (DigestMismatchException e)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("Digest mismatch:", e);
-                final RepairCallback<Row> callback = repair(command, endpoints);
-                Runnable runnable = new WrappedRunnable()
-                {
-                    public void runMayThrow() throws DigestMismatchException, IOException, TimeoutException
-                    {
-                        callback.get();
-                    }
-                };
-                repairExecutor.schedule(runnable, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-            }
-        }
-    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/TruncateResponseHandler.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/TruncateResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/TruncateResponseHandler.java Tue Feb 15 17:16:52 2011
@@ -73,4 +73,9 @@ public class TruncateResponseHandler imp
         if (responses.get() >= responseCount)
             condition.signal();
     }
+
+    public boolean isLatencyForSnitch()
+    {
+        return false;
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/WriteResponseHandler.java Tue Feb 15 17:16:52 2011
@@ -121,4 +121,9 @@ public class WriteResponseHandler extend
             throw new UnavailableException();
         }
     }
+
+    public boolean isLatencyForSnitch()
+    {
+        return false;
+    }
 }

Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Tue Feb 15 17:16:52 2011
@@ -71,7 +71,7 @@ public class ConsistencyLevelTest extend
 
         AbstractReplicationStrategy strategy;
 
-        for (String table : DatabaseDescriptor.getNonSystemTables())
+        for (final String table : DatabaseDescriptor.getNonSystemTables())
         {
             strategy = getStrategy(table, tmd);
             StorageService.calculatePendingRanges(strategy, table);
@@ -96,7 +96,15 @@ public class ConsistencyLevelTest extend
 
                     IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts, hintedNodes, c);
 
-                    ReadCallback<Row> readHandler = StorageProxy.getReadCallback(new ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), table, c);
+                    IReadCommand command = new IReadCommand()
+                    {
+                        public String getKeyspace()
+                        {
+                            return table;
+                        }
+                    };
+                    RowRepairResolver resolver = new RowRepairResolver(table, ByteBufferUtil.bytes("foo"));
+                    ReadCallback<Row> readHandler = StorageProxy.getReadCallback(resolver, command, c, new ArrayList<InetAddress>(hintedNodes.keySet()));
 
                     boolean isWriteUnavailable = false;
                     boolean isReadUnavailable = false;
@@ -111,7 +119,7 @@ public class ConsistencyLevelTest extend
 
                     try
                     {
-                        readHandler.assureSufficientLiveNodes(hintedNodes.asMap().keySet());
+                        readHandler.assureSufficientLiveNodes();
                     }
                     catch (UnavailableException e)
                     {

Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java Tue Feb 15 17:16:52 2011
@@ -1,96 +0,0 @@
-package org.apache.cassandra.service;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.util.Arrays;
-
-import org.apache.cassandra.SchemaLoader;
-import org.junit.Test;
-
-import org.apache.cassandra.db.ColumnFamily;
-
-import static org.apache.cassandra.db.TableTest.assertColumns;
-import static org.apache.cassandra.Util.column;
-import static junit.framework.Assert.assertNull;
-
-public class ReadResponseResolverTest extends SchemaLoader
-{
-    @Test
-    public void testResolveSupersetNewer()
-    {
-        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
-        cf1.addColumn(column("c1", "v1", 0));
-
-        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
-        cf2.addColumn(column("c1", "v2", 1));
-
-        ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
-        assertColumns(resolved, "c1");
-        assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
-        assertNull(ColumnFamily.diff(cf2, resolved));
-    }
-
-    @Test
-    public void testResolveSupersetDisjoint()
-    {
-        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
-        cf1.addColumn(column("c1", "v1", 0));
-
-        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
-        cf2.addColumn(column("c2", "v2", 1));
-
-        ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
-        assertColumns(resolved, "c1", "c2");
-        assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
-        assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
-    }
-
-    @Test
-    public void testResolveSupersetNullOne()
-    {
-        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
-        cf2.addColumn(column("c2", "v2", 1));
-
-        ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(null, cf2));
-        assertColumns(resolved, "c2");
-        assertColumns(ColumnFamily.diff(null, resolved), "c2");
-        assertNull(ColumnFamily.diff(cf2, resolved));
-    }
-
-    @Test
-    public void testResolveSupersetNullTwo()
-    {
-        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
-        cf1.addColumn(column("c1", "v1", 0));
-
-        ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, null));
-        assertColumns(resolved, "c1");
-        assertNull(ColumnFamily.diff(cf1, resolved));
-        assertColumns(ColumnFamily.diff(null, resolved), "c1");
-    }
-
-    @Test
-    public void testResolveSupersetNullBoth()
-    {
-        assertNull(ReadResponseResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
-    }
-}

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RowResolverTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RowResolverTest.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RowResolverTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RowResolverTest.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,96 @@
+package org.apache.cassandra.service;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.util.Arrays;
+
+import org.apache.cassandra.SchemaLoader;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamily;
+
+import static org.apache.cassandra.db.TableTest.assertColumns;
+import static org.apache.cassandra.Util.column;
+import static junit.framework.Assert.assertNull;
+
+public class RowResolverTest extends SchemaLoader
+{
+    @Test
+    public void testResolveSupersetNewer()
+    {
+        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf1.addColumn(column("c1", "v1", 0));
+
+        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf2.addColumn(column("c1", "v2", 1));
+
+        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        assertColumns(resolved, "c1");
+        assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
+        assertNull(ColumnFamily.diff(cf2, resolved));
+    }
+
+    @Test
+    public void testResolveSupersetDisjoint()
+    {
+        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf1.addColumn(column("c1", "v1", 0));
+
+        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf2.addColumn(column("c2", "v2", 1));
+
+        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        assertColumns(resolved, "c1", "c2");
+        assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
+        assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
+    }
+
+    @Test
+    public void testResolveSupersetNullOne()
+    {
+        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf2.addColumn(column("c2", "v2", 1));
+
+        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(null, cf2));
+        assertColumns(resolved, "c2");
+        assertColumns(ColumnFamily.diff(null, resolved), "c2");
+        assertNull(ColumnFamily.diff(cf2, resolved));
+    }
+
+    @Test
+    public void testResolveSupersetNullTwo()
+    {
+        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf1.addColumn(column("c1", "v1", 0));
+
+        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, null));
+        assertColumns(resolved, "c1");
+        assertNull(ColumnFamily.diff(cf1, resolved));
+        assertColumns(ColumnFamily.diff(null, resolved), "c1");
+    }
+
+    @Test
+    public void testResolveSupersetNullBoth()
+    {
+        assertNull(RowRepairResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
+    }
+}