You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/15 21:50:22 UTC

svn commit: r1071046 - in /cassandra/branches/cassandra-0.7.2: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ t...

Author: jbellis
Date: Tue Feb 15 20:50:21 2011
New Revision: 1071046

URL: http://svn.apache.org/viewvc?rev=1071046&view=rev
Log:
revert #2069

Added:
    cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
      - copied, changed from r1071027, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
Removed:
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/AbstractRowResolver.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/IReadCommand.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RowDigestResolver.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RowRepairResolver.java
    cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/RowResolverTest.java
Modified:
    cassandra/branches/cassandra-0.7.2/CHANGES.txt
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/Stage.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/StageManager.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/AsyncResult.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/IMessageCallback.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadCallback.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RepairCallback.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
    cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java

Modified: cassandra/branches/cassandra-0.7.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/CHANGES.txt?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7.2/CHANGES.txt Tue Feb 15 20:50:21 2011
@@ -2,7 +2,6 @@
  * copy DecoratedKey.key when inserting into caches to avoid retaining
    a reference to the underlying buffer (CASSANDRA-2102)
  * format subcolumn names with subcomparator (CASSANDRA-2136)
- * lower-latency read repair (CASSANDRA-2069)
  * fix column bloom filter deserialization (CASSANDRA-2165)
 
 

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/Stage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/Stage.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/Stage.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/Stage.java Tue Feb 15 20:50:21 2011
@@ -31,8 +31,7 @@ public enum Stage
     ANTI_ENTROPY,
     MIGRATION,
     MISC,
-    INTERNAL_RESPONSE,
-    READ_REPAIR;
+    INTERNAL_RESPONSE;
 
     public String getJmxType()
     {
@@ -48,7 +47,6 @@ public enum Stage
             case MUTATION:
             case READ:
             case REQUEST_RESPONSE:
-            case READ_REPAIR:
                 return "request";
             default:
                 throw new AssertionError("Unknown stage " + this);

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/StageManager.java Tue Feb 15 20:50:21 2011
@@ -50,7 +50,6 @@ public class StageManager
         stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
         stages.put(Stage.MIGRATION, new JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
         stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
-        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Runtime.getRuntime().availableProcessors()));
     }
 
     private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/RangeSliceCommand.java Tue Feb 15 20:50:21 2011
@@ -47,7 +47,6 @@ import org.apache.cassandra.dht.Abstract
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.SlicePredicate;
@@ -57,7 +56,7 @@ import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TSerializer;
 import org.apache.cassandra.thrift.TBinaryProtocol;
 
-public class RangeSliceCommand implements IReadCommand
+public class RangeSliceCommand
 {
     private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
     
@@ -114,11 +113,6 @@ public class RangeSliceCommand implement
         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
         return serializer.deserialize(new DataInputStream(bis));
     }
-
-    public String getKeyspace()
-    {
-        return keyspace;
-    }
 }
 
 class RangeSliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/ReadCommand.java Tue Feb 15 20:50:21 2011
@@ -30,12 +30,11 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 
-public abstract class ReadCommand implements IReadCommand
+public abstract class ReadCommand
 {
     public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
     public static final byte CMD_TYPE_GET_SLICE = 2;
@@ -92,11 +91,6 @@ public abstract class ReadCommand implem
     {
         return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
     }
-
-    public String getKeyspace()
-    {
-        return table;
-    }
 }
 
 class ReadCommandSerializer implements ICompactSerializer<ReadCommand>

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Feb 15 20:50:21 2011
@@ -282,10 +282,5 @@ public class BootStrapper
             token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), Charsets.UTF_8));
             condition.signalAll();
         }
-
-        public boolean isLatencyForSnitch()
-        {
-            return false;
-        }
     }
 }

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/AsyncResult.java Tue Feb 15 20:50:21 2011
@@ -96,11 +96,6 @@ class AsyncResult implements IAsyncResul
         }        
     }
 
-    public boolean isLatencyForSnitch()
-    {
-        return false;
-    }
-
     public InetAddress getFrom()
     {
         return from;

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/IMessageCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/IMessageCallback.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/IMessageCallback.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/IMessageCallback.java Tue Feb 15 20:50:21 2011
@@ -23,9 +23,4 @@ package org.apache.cassandra.net;
 
 public interface IMessageCallback
 {
-    /**
-     * @return true if this callback is on the read path and its latency should be
-     * given as input to the dynamic snitch.
-     */
-    public boolean isLatencyForSnitch();
 }

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/MessagingService.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/MessagingService.java Tue Feb 15 20:50:21 2011
@@ -138,7 +138,7 @@ public final class MessagingService impl
      */
     public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency)
     {
-        if (cb.isLatencyForSnitch())
+        if (cb instanceof ReadCallback || cb instanceof AsyncResult)
             addLatency(address, latency);
     }
 

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Tue Feb 15 20:50:21 2011
@@ -22,12 +22,12 @@ package org.apache.cassandra.service;
 
 
 import java.net.InetAddress;
-import java.util.List;
+import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.Message;
@@ -44,12 +44,12 @@ public class DatacenterReadCallback<T> e
     private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
     private AtomicInteger localResponses;
     
-    public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
+    public DatacenterReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
     {
-        super(resolver, consistencyLevel, command, endpoints);
+        super(resolver, consistencyLevel, table);
         localResponses = new AtomicInteger(blockfor);
     }
-
+    
     @Override
     public void response(Message message)
     {
@@ -68,15 +68,14 @@ public class DatacenterReadCallback<T> e
     @Override
     public void response(ReadResponse result)
     {
-        ((RowDigestResolver) resolver).injectPreProcessed(result);
+        ((ReadResponseResolver) resolver).injectPreProcessed(result);
 
         int n = localResponses.decrementAndGet();
+
         if (n == 0 && resolver.isDataPresent())
         {
             condition.signal();
         }
-
-        maybeResolveForRepair();
     }
     
     @Override
@@ -87,7 +86,7 @@ public class DatacenterReadCallback<T> e
 	}
 
     @Override
-    public void assureSufficientLiveNodes() throws UnavailableException
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
     {
         int localEndpoints = 0;
         for (InetAddress endpoint : endpoints)

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Tue Feb 15 20:50:21 2011
@@ -115,9 +115,4 @@ public class DatacenterSyncWriteResponse
                 throw new UnavailableException();
         }
     }
-
-    public boolean isLatencyForSnitch()
-    {
-        return false;
-    }
 }

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Tue Feb 15 20:50:21 2011
@@ -49,6 +49,7 @@ public class RangeSliceResponseResolver 
 
     public RangeSliceResponseResolver(String table, List<InetAddress> sources)
     {
+        assert sources.size() > 0;
         this.sources = sources;
         this.table = table;
     }
@@ -102,8 +103,8 @@ public class RangeSliceResponseResolver 
 
             protected Row getReduced()
             {
-                ColumnFamily resolved = RowRepairResolver.resolveSuperset(versions);
-                RowRepairResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
+                ColumnFamily resolved = ReadResponseResolver.resolveSuperset(versions);
+                ReadResponseResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
                 versions.clear();
                 versionSources.clear();
                 return new Row(key, resolved);

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadCallback.java Tue Feb 15 20:50:21 2011
@@ -20,20 +20,14 @@ package org.apache.cassandra.service;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.List;
-import java.util.Random;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -42,61 +36,28 @@ import org.apache.cassandra.net.Messagin
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.SimpleCondition;
-import org.apache.cassandra.utils.WrappedRunnable;
 
 public class ReadCallback<T> implements IAsyncCallback
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
-    private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
-    {
-        @Override
-        protected Random initialValue()
-        {
-            return new Random();
-        }
-    };
-
     public final IResponseResolver<T> resolver;
     protected final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
     protected final int blockfor;
-    final List<InetAddress> endpoints;
-    private final IReadCommand command;
-
+    
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
+    public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
     {
-        this.command = command;
-        this.blockfor = determineBlockFor(consistencyLevel, command.getKeyspace());
+        this.blockfor = determineBlockFor(consistencyLevel, table);
         this.resolver = resolver;
         this.startTime = System.currentTimeMillis();
-        boolean repair = randomlyReadRepair();
-        this.endpoints = repair || resolver instanceof RowRepairResolver
-                       ? endpoints
-                       : endpoints.subList(0, Math.min(endpoints.size(), blockfor)); // min so as to not throw exception until assureSufficient is called
-
-        if (logger.isDebugEnabled())
-            logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s",
-                                       blockfor, repair, StringUtils.join(this.endpoints, ",")));
+
+        logger.debug("ReadCallback blocking for {} responses", blockfor);
     }
     
-    private boolean randomlyReadRepair()
-    {
-        if (resolver instanceof RowDigestResolver)
-        {
-            assert command instanceof ReadCommand : command;
-            String table = ((RowDigestResolver) resolver).table;
-            String columnFamily = ((ReadCommand) command).getColumnFamilyName();
-            CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(table).get(columnFamily);
-            return cfmd.getReadRepairChance() > random.get().nextDouble();
-        }
-        // we don't read repair on range scans
-        return false;
-    }
-
     public T get() throws TimeoutException, DigestMismatchException, IOException
     {
         long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -124,42 +85,21 @@ public class ReadCallback<T> implements 
     public void response(Message message)
     {
         resolver.preprocess(message);
-        assert resolver.getMessageCount() <= endpoints.size();
         if (resolver.getMessageCount() < blockfor)
             return;
         if (resolver.isDataPresent())
-        {
             condition.signal();
-            maybeResolveForRepair();
-        }
     }
 
     public void response(ReadResponse result)
     {
-        ((RowDigestResolver) resolver).injectPreProcessed(result);
-        assert resolver.getMessageCount() <= endpoints.size();
+        ((ReadResponseResolver) resolver).injectPreProcessed(result);
         if (resolver.getMessageCount() < blockfor)
             return;
         if (resolver.isDataPresent())
-        {
             condition.signal();
-            maybeResolveForRepair();
-        }
     }
-
-    /**
-     * Check digests in the background on the Repair stage if we've received replies
-     * too all the requests we sent.
-     */
-    protected void maybeResolveForRepair()
-    {
-        if (blockfor < endpoints.size() && resolver.getMessageCount() == endpoints.size())
-        {
-            assert resolver.isDataPresent();
-            StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner());
-        }
-    }
-
+    
     public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
     {
         switch (consistencyLevel)
@@ -176,38 +116,9 @@ public class ReadCallback<T> implements 
         }
     }
 
-    public void assureSufficientLiveNodes() throws UnavailableException
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
     {
         if (endpoints.size() < blockfor)
             throw new UnavailableException();
     }
-
-    public boolean isLatencyForSnitch()
-    {
-        return true;
-    }
-
-    private class AsyncRepairRunner extends WrappedRunnable
-    {
-        protected void runMayThrow() throws IOException
-        {
-            try
-            {
-                resolver.resolve();
-            }
-            catch (DigestMismatchException e)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("Digest mismatch:", e);
-
-                ReadCommand readCommand = (ReadCommand) command;
-                final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key);
-                IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
-
-                Message messageRepair = readCommand.makeReadMessage();
-                for (InetAddress endpoint : endpoints)
-                    MessagingService.instance().sendRR(messageRepair, endpoint, repairHandler);
-            }
-        }
-    }
 }

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadResponseResolver.java Tue Feb 15 20:50:21 2011
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+/**
+ * Turns ReadResponse messages into Row objects, resolving to the most recent
+ * version and setting up read repairs as necessary.
+ */
+public class ReadResponseResolver implements IResponseResolver<Row>
+{
+	private static Logger logger_ = LoggerFactory.getLogger(ReadResponseResolver.class);
+    private final String table;
+    private final ConcurrentMap<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
+    private DecoratedKey key;
+    private ByteBuffer digest;
+    private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);;
+
+    public ReadResponseResolver(String table, ByteBuffer key)
+    {
+        this.table = table;
+        this.key = StorageService.getPartitioner().decorateKey(key);
+    }
+    
+    public Row getData() throws IOException
+    {
+        for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+        {
+            ReadResponse result = entry.getValue();
+            if (!result.isDigestQuery())
+                return result.row();
+        }
+
+        throw new AssertionError("getData should not be invoked when no data is present");
+    }
+
+    /*
+     * This method handles three different scenarios:
+     *
+     * 1a)we're handling the initial read, of data from the closest replica + digests
+     *    from the rest.  In this case we check the digests against each other,
+     *    throw an exception if there is a mismatch, otherwise return the data row.
+     *
+     * 1b)we're checking additional digests that arrived after the minimum to handle
+     *    the requested ConsistencyLevel, i.e. asynchronouse read repair check
+     *
+     * 2) there was a mismatch on the initial read (1a or 1b), so we redid the digest requests
+     *    as full data reads.  In this case we need to compute the most recent version
+     *    of each column, and send diffs to out-of-date replicas.
+     */
+    public Row resolve() throws DigestMismatchException, IOException
+    {
+        if (logger_.isDebugEnabled())
+            logger_.debug("resolving " + results.size() + " responses");
+
+        long startTime = System.currentTimeMillis();
+		List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
+		List<InetAddress> endpoints = new ArrayList<InetAddress>();
+
+        // case 1: validate digests against each other; throw immediately on mismatch.
+        // also, collects data results into versions/endpoints lists.
+        //
+        // results are cleared as we process them, to avoid unnecessary duplication of work
+        // when resolve() is called a second time for read repair on responses that were not
+        // necessary to satisfy ConsistencyLevel.
+        for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+        {
+            ReadResponse result = entry.getValue();
+            Message message = entry.getKey();
+            if (result.isDigestQuery())
+            {
+                if (digest == null)
+                {
+                    digest = result.digest();
+                }
+                else
+                {
+                    ByteBuffer digest2 = result.digest();
+                    if (!digest.equals(digest2))
+                        throw new DigestMismatchException(key, digest, digest2);
+                }
+            }
+            else
+            {
+                versions.add(result.row().cf);
+                endpoints.add(message.getFrom());
+            }
+
+            results.remove(message);
+        }
+
+		// If there was a digest query compare it with all the data digests
+		// If there is a mismatch then throw an exception so that read repair can happen.
+        //
+        // It's important to note that we do not compare the digests of multiple data responses --
+        // if we are in that situation we know there was a previous mismatch and now we're doing a repair,
+        // so our job is now case 2: figure out what the most recent version is and update everyone to that version.
+        if (digest != null)
+        {
+            for (ColumnFamily cf : versions)
+            {
+                ByteBuffer digest2 = ColumnFamily.digest(cf);
+                if (!digest.equals(digest2))
+                    throw new DigestMismatchException(key, digest, digest2);
+            }
+            if (logger_.isDebugEnabled())
+                logger_.debug("digests verified");
+        }
+
+        ColumnFamily resolved;
+        if (versions.size() > 1)
+        {
+            resolved = resolveSuperset(versions);
+            if (logger_.isDebugEnabled())
+                logger_.debug("versions merged");
+            maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+        }
+        else
+        {
+            resolved = versions.get(0);
+        }
+
+        if (logger_.isDebugEnabled())
+            logger_.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+		return new Row(key, resolved);
+	}
+
+    /**
+     * For each row version, compare with resolved (the superset of all row versions);
+     * if it is missing anything, send a mutation to the endpoint it come from.
+     */
+    public static void maybeScheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
+    {
+        for (int i = 0; i < versions.size(); i++)
+        {
+            ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
+            if (diffCf == null) // no repair needs to happen
+                continue;
+
+            // create and send the row mutation message based on the diff
+            RowMutation rowMutation = new RowMutation(table, key.key);
+            rowMutation.add(diffCf);
+            Message repairMessage;
+            try
+            {
+                repairMessage = rowMutation.makeRowMutationMessage(StorageService.Verb.READ_REPAIR);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            MessagingService.instance().sendOneWay(repairMessage, endpoints.get(i));
+        }
+    }
+
+    static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
+    {
+        assert versions.size() > 0;
+
+        ColumnFamily resolved = null;
+        for (ColumnFamily cf : versions)
+        {
+            if (cf != null)
+            {
+                resolved = cf.cloneMe();
+                break;
+            }
+        }
+        if (resolved == null)
+            return null;
+
+        for (ColumnFamily cf : versions)
+            resolved.resolve(cf);
+
+        return resolved;
+    }
+
+    public void preprocess(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        try
+        {
+            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+            if (logger_.isDebugEnabled())
+                logger_.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
+            results.put(message, result);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    /** hack so local reads don't force de/serialization of an extra real Message */
+    public void injectPreProcessed(ReadResponse result)
+    {
+        assert results.get(FAKE_MESSAGE) == null; // should only be one local reply
+        results.put(FAKE_MESSAGE, result);
+    }
+
+    public boolean isDataPresent()
+	{
+        for (ReadResponse result : results.values())
+        {
+            if (!result.isDigestQuery())
+                return true;
+        }
+        return false;
+    }
+
+    public Iterable<Message> getMessages()
+    {
+        return results.keySet();
+    }
+
+    public int getMessageCount()
+    {
+        return results.size();
+    }
+}

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RepairCallback.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RepairCallback.java Tue Feb 15 20:50:21 2011
@@ -39,14 +39,6 @@ public class RepairCallback<T> implement
     private final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
 
-    /**
-     * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
-     * it needs to achieve.  Repair on the other hand is happy to repair whoever replies within the timeout.
-     *
-     * (The other main difference of course is, this is only created once we know we have a digest
-     * mismatch, and we're going to do full-data reads from everyone -- that is, this is the final
-     * stage in the read process.)
-     */
     public RepairCallback(IResponseResolver<T> resolver, List<InetAddress> endpoints)
     {
         this.resolver = resolver;
@@ -54,6 +46,10 @@ public class RepairCallback<T> implement
         this.startTime = System.currentTimeMillis();
     }
 
+    /**
+     * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
+     * it needs to achieve.  Repair on the other hand is happy to repair whoever replies within the timeout.
+     */
     public T get() throws TimeoutException, DigestMismatchException, IOException
     {
         long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -75,9 +71,4 @@ public class RepairCallback<T> implement
         if (resolver.getMessageCount() == endpoints.size())
             condition.signal();
     }
-
-    public boolean isLatencyForSnitch()
-    {
-        return true;
-    }
 }

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/StorageProxy.java Tue Feb 15 20:50:21 2011
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -58,6 +59,17 @@ public class StorageProxy implements Sto
 {
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
 
+    private static ScheduledExecutorService repairExecutor = new ScheduledThreadPoolExecutor(1); // TODO JMX-enable this
+
+    private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
+    {
+        @Override
+        protected Random initialValue()
+        {
+            return new Random();
+        }
+    };
+
     // mbean stuff
     private static final LatencyTracker readStats = new LatencyTracker();
     private static final LatencyTracker rangeStats = new LatencyTracker();
@@ -66,8 +78,6 @@ public class StorageProxy implements Sto
     private static int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
     private static final String UNREACHABLE = "UNREACHABLE";
 
-    public static final StorageProxy instance = new StorageProxy();
-
     private StorageProxy() {}
     static
     {
@@ -313,55 +323,66 @@ public class StorageProxy implements Sto
     private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
     {
         List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>();
+        List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>();
         List<Row> rows = new ArrayList<Row>();
+        Set<ReadCommand> repairs = new HashSet<ReadCommand>();
 
         // send out read requests
         for (ReadCommand command: commands)
         {
             assert !command.isDigestQuery();
-            logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
             List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
             DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
 
-            RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
-            ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
-            handler.assureSufficientLiveNodes();
-            assert !handler.endpoints.isEmpty();
+            ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
+            ReadCallback<Row> handler = getReadCallback(resolver, command.table, consistency_level);
+            handler.assureSufficientLiveNodes(endpoints);
 
+            // if we're not going to read repair, cut the endpoints list down to the ones required to satisfy ConsistencyLevel
+            if (randomlyReadRepair(command))
+            {
+                if (endpoints.size() > handler.blockfor)
+                    repairs.add(command);
+            }
+            else
+            {
+                endpoints = endpoints.subList(0, handler.blockfor);
+            }
+            
             // The data-request message is sent to dataPoint, the node that will actually get
             // the data for us. The other replicas are only sent a digest query.
             ReadCommand digestCommand = null;
-            if (handler.endpoints.size() > 1)
+            if (endpoints.size() > 1)
             {
                 digestCommand = command.copy();
                 digestCommand.setDigestQuery(true);
             }
 
-            InetAddress dataPoint = handler.endpoints.get(0);
+            InetAddress dataPoint = endpoints.get(0);
             if (dataPoint.equals(FBUtilities.getLocalAddress()))
             {
                 if (logger.isDebugEnabled())
-                    logger.debug("reading data locally");
+                    logger.debug("reading data for " + command + " locally");
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
             }
             else
             {
                 Message message = command.makeReadMessage();
                 if (logger.isDebugEnabled())
-                    logger.debug("reading data from " + dataPoint);
+                    logger.debug("reading data for " + command + " from " + dataPoint);
                 MessagingService.instance().sendRR(message, dataPoint, handler);
             }
 
             // We lazy-construct the digest Message object since it may not be necessary if we
             // are doing a local digest read, or no digest reads at all.
             Message digestMessage = null;
-            for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
+            for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
             {
                 if (digestPoint.equals(FBUtilities.getLocalAddress()))
                 {
                     if (logger.isDebugEnabled())
-                        logger.debug("reading digest locally");
+                        logger.debug("reading digest for " + command + " locally");
                     StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
                 }
                 else
@@ -369,45 +390,44 @@ public class StorageProxy implements Sto
                     if (digestMessage == null)
                         digestMessage = digestCommand.makeReadMessage();
                     if (logger.isDebugEnabled())
-                        logger.debug("reading digest for from " + digestPoint);
+                        logger.debug("reading digest for " + command + " from " + digestPoint);
                     MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
                 }
             }
 
             readCallbacks.add(handler);
+            commandEndpoints.add(endpoints);
         }
 
         // read results and make a second pass for any digest mismatches
         List<RepairCallback<Row>> repairResponseHandlers = null;
         for (int i = 0; i < commands.size(); i++)
         {
-            ReadCallback<Row> handler = readCallbacks.get(i);
+            ReadCallback<Row> readCallback = readCallbacks.get(i);
             Row row;
             ReadCommand command = commands.get(i);
+            List<InetAddress> endpoints = commandEndpoints.get(i);
             try
             {
                 long startTime2 = System.currentTimeMillis();
-                row = handler.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
+                row = readCallback.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
                 if (row != null)
                     rows.add(row);
 
                 if (logger.isDebugEnabled())
                     logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms.");
+
+                if (repairs.contains(command))
+                    repairExecutor.schedule(new RepairRunner(readCallback.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             }
             catch (DigestMismatchException ex)
             {
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", ex);
-
-                RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
-                RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver, handler.endpoints);
-                Message messageRepair = command.makeReadMessage();
-                for (InetAddress endpoint : handler.endpoints)
-                    MessagingService.instance().sendRR(messageRepair, endpoint, repairHandler);
-
+                RepairCallback<Row> handler = repair(command, endpoints);
                 if (repairResponseHandlers == null)
                     repairResponseHandlers = new ArrayList<RepairCallback<Row>>();
-                repairResponseHandlers.add(repairHandler);
+                repairResponseHandlers.add(handler);
             }
         }
 
@@ -456,13 +476,24 @@ public class StorageProxy implements Sto
         }
     }
     
-    static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
+    static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, String table, ConsistencyLevel consistencyLevel)
     {
         if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
         {
-            return new DatacenterReadCallback(resolver, consistencyLevel, command, endpoints);
+            return new DatacenterReadCallback(resolver, consistencyLevel, table);
         }
-        return new ReadCallback(resolver, consistencyLevel, command, endpoints);
+        return new ReadCallback(resolver, consistencyLevel, table);
+    }
+
+    private static RepairCallback<Row> repair(ReadCommand command, List<InetAddress> endpoints)
+    throws IOException
+    {
+        ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
+        RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
+        Message messageRepair = command.makeReadMessage();
+        for (InetAddress endpoint : endpoints)
+            MessagingService.instance().sendRR(messageRepair, endpoint, handler);
+        return handler;
     }
 
     /*
@@ -514,14 +545,16 @@ public class StorageProxy implements Sto
 
                     // collect replies and resolve according to consistency level
                     RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
-                    ReadCallback<List<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
-                    handler.assureSufficientLiveNodes();
+                    AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy();
+                    ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
+                    // TODO bail early if live endpoints can't satisfy requested consistency level
                     for (InetAddress endpoint : liveEndpoints)
                     {
                         MessagingService.instance().sendRR(message, endpoint, handler);
                         if (logger.isDebugEnabled())
                             logger.debug("reading " + c2 + " from " + endpoint);
                     }
+                    // TODO read repair on remaining replicas?
 
                     // if we're done, great, otherwise, move to the next range
                     try 
@@ -574,11 +607,6 @@ public class StorageProxy implements Sto
                 versions.put(message.getFrom(), theirVersion);
                 latch.countDown();
             }
-
-            public boolean isLatencyForSnitch()
-            {
-                return false;
-            }
         };
         // an empty message acts as a request to the SchemaCheckVerbHandler.
         for (InetAddress endpoint : liveHosts)
@@ -671,6 +699,12 @@ public class StorageProxy implements Sto
 
         return ranges;
     }
+    
+    private static boolean randomlyReadRepair(ReadCommand command)
+    {
+        CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(command.table).get(command.getColumnFamilyName());
+        return cfmd.getReadRepairChance() > random.get().nextDouble();
+    }
 
     public long getReadOperations()
     {
@@ -747,7 +781,7 @@ public class StorageProxy implements Sto
         return writeStats.getRecentLatencyHistogramMicros();
     }
 
-    public static List<Row> scan(final String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
+    public static List<Row> scan(String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
     throws IOException, TimeoutException, UnavailableException
     {
         IPartitioner p = StorageService.getPartitioner();
@@ -765,16 +799,12 @@ public class StorageProxy implements Sto
 
             // collect replies and resolve according to consistency level
             RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints);
-            IReadCommand iCommand = new IReadCommand()
-            {
-                public String getKeyspace()
-                {
-                    return keyspace;
-                }
-            };
-            ReadCallback<List<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints);
-            handler.assureSufficientLiveNodes();
-
+            ReadCallback<List<Row>> handler = getReadCallback(resolver, keyspace, consistency_level);
+            
+            // bail early if live endpoints can't satisfy requested consistency level
+            if(handler.blockfor > liveEndpoints.size())
+                throw new UnavailableException();
+            
             IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
             Message message = command.getMessage();
             for (InetAddress endpoint : liveEndpoints)
@@ -882,4 +912,40 @@ public class StorageProxy implements Sto
     {
         return !Gossiper.instance.getUnreachableMembers().isEmpty();
     }
+
+    private static class RepairRunner extends WrappedRunnable
+    {
+        private final IResponseResolver<Row> resolver;
+        private final ReadCommand command;
+        private final List<InetAddress> endpoints;
+
+        public RepairRunner(IResponseResolver<Row> resolver, ReadCommand command, List<InetAddress> endpoints)
+        {
+            this.resolver = resolver;
+            this.command = command;
+            this.endpoints = endpoints;
+        }
+
+        protected void runMayThrow() throws IOException
+        {
+            try
+            {
+                resolver.resolve();
+            }
+            catch (DigestMismatchException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Digest mismatch:", e);
+                final RepairCallback<Row> callback = repair(command, endpoints);
+                Runnable runnable = new WrappedRunnable()
+                {
+                    public void runMayThrow() throws DigestMismatchException, IOException, TimeoutException
+                    {
+                        callback.get();
+                    }
+                };
+                repairExecutor.schedule(runnable, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+            }
+        }
+    }
 }

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java Tue Feb 15 20:50:21 2011
@@ -73,9 +73,4 @@ public class TruncateResponseHandler imp
         if (responses.get() >= responseCount)
             condition.signal();
     }
-
-    public boolean isLatencyForSnitch()
-    {
-        return false;
-    }
 }

Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/WriteResponseHandler.java Tue Feb 15 20:50:21 2011
@@ -121,9 +121,4 @@ public class WriteResponseHandler extend
             throw new UnavailableException();
         }
     }
-
-    public boolean isLatencyForSnitch()
-    {
-        return false;
-    }
 }

Modified: cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (original)
+++ cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Tue Feb 15 20:50:21 2011
@@ -71,7 +71,7 @@ public class ConsistencyLevelTest extend
 
         AbstractReplicationStrategy strategy;
 
-        for (final String table : DatabaseDescriptor.getNonSystemTables())
+        for (String table : DatabaseDescriptor.getNonSystemTables())
         {
             strategy = getStrategy(table, tmd);
             StorageService.calculatePendingRanges(strategy, table);
@@ -96,15 +96,7 @@ public class ConsistencyLevelTest extend
 
                     IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts, hintedNodes, c);
 
-                    IReadCommand command = new IReadCommand()
-                    {
-                        public String getKeyspace()
-                        {
-                            return table;
-                        }
-                    };
-                    RowRepairResolver resolver = new RowRepairResolver(table, ByteBufferUtil.bytes("foo"));
-                    ReadCallback<Row> readHandler = StorageProxy.getReadCallback(resolver, command, c, new ArrayList<InetAddress>(hintedNodes.keySet()));
+                    ReadCallback<Row> readHandler = StorageProxy.getReadCallback(new ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), table, c);
 
                     boolean isWriteUnavailable = false;
                     boolean isReadUnavailable = false;
@@ -119,7 +111,7 @@ public class ConsistencyLevelTest extend
 
                     try
                     {
-                        readHandler.assureSufficientLiveNodes();
+                        readHandler.assureSufficientLiveNodes(hintedNodes.asMap().keySet());
                     }
                     catch (UnavailableException e)
                     {

Copied: cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java (from r1071027, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java)
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java?p2=cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java&p1=cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java&r1=1071027&r2=1071046&rev=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java (original)
+++ cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java Tue Feb 15 20:50:21 2011
@@ -0,0 +1,96 @@
+package org.apache.cassandra.service;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.util.Arrays;
+
+import org.apache.cassandra.SchemaLoader;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamily;
+
+import static org.apache.cassandra.db.TableTest.assertColumns;
+import static org.apache.cassandra.Util.column;
+import static junit.framework.Assert.assertNull;
+
+public class ReadResponseResolverTest extends SchemaLoader
+{
+    @Test
+    public void testResolveSupersetNewer()
+    {
+        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf1.addColumn(column("c1", "v1", 0));
+
+        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf2.addColumn(column("c1", "v2", 1));
+
+        ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        assertColumns(resolved, "c1");
+        assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
+        assertNull(ColumnFamily.diff(cf2, resolved));
+    }
+
+    @Test
+    public void testResolveSupersetDisjoint()
+    {
+        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf1.addColumn(column("c1", "v1", 0));
+
+        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf2.addColumn(column("c2", "v2", 1));
+
+        ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        assertColumns(resolved, "c1", "c2");
+        assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
+        assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
+    }
+
+    @Test
+    public void testResolveSupersetNullOne()
+    {
+        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf2.addColumn(column("c2", "v2", 1));
+
+        ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(null, cf2));
+        assertColumns(resolved, "c2");
+        assertColumns(ColumnFamily.diff(null, resolved), "c2");
+        assertNull(ColumnFamily.diff(cf2, resolved));
+    }
+
+    @Test
+    public void testResolveSupersetNullTwo()
+    {
+        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf1.addColumn(column("c1", "v1", 0));
+
+        ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, null));
+        assertColumns(resolved, "c1");
+        assertNull(ColumnFamily.diff(cf1, resolved));
+        assertColumns(ColumnFamily.diff(null, resolved), "c1");
+    }
+
+    @Test
+    public void testResolveSupersetNullBoth()
+    {
+        assertNull(ReadResponseResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
+    }
+}