You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/07 11:14:46 UTC

[1/2] git commit: Ensure CL guarantees on digest mismatch

Ensure CL guarantees on digest mismatch

patch by slebresne; reviewed by jbellis for CASSANDRA-5113


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d787b78
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d787b78
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d787b78

Branch: refs/heads/trunk
Commit: 3d787b78c155773edcf29af8290ef1bea62a4206
Parents: af8a477
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Jan 7 11:11:45 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Jan 7 11:11:45 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/db/ReadCommand.java  |    4 +-
 .../apache/cassandra/db/SliceFromReadCommand.java  |    6 +-
 .../cassandra/service/AsyncRepairCallback.java     |    4 +-
 .../cassandra/service/DatacenterReadCallback.java  |   13 +-
 .../service/RangeSliceResponseResolver.java        |    4 +-
 .../org/apache/cassandra/service/ReadCallback.java |   26 ++-
 .../apache/cassandra/service/RepairCallback.java   |   86 -------
 .../apache/cassandra/service/RowDataResolver.java  |  182 +++++++++++++++
 .../cassandra/service/RowDigestResolver.java       |    2 +-
 .../cassandra/service/RowRepairResolver.java       |  182 ---------------
 .../org/apache/cassandra/service/StorageProxy.java |   16 +-
 .../apache/cassandra/service/RowResolverTest.java  |   20 +-
 13 files changed, 247 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ec66c9..95aad22 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
  * cqlsh: Add default limit to SELECT statements (CASSANDRA-4972)
  * cqlsh: fix DESCRIBE for 1.1 cfs in CQL3 (CASSANDRA-5101)
  * Correctly gossip with nodes >= 1.1.7 (CASSANDRA-5102)
+ * Ensure CL guarantees on digest mismatch (CASSANDRA-5113)
 Merged from 1.1:
  * Pig: correctly decode row keys in widerow mode (CASSANDRA-5098)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index f3494e5..6c364cb 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
-import org.apache.cassandra.service.RepairCallback;
+import org.apache.cassandra.service.RowDataResolver;
 import org.apache.cassandra.utils.IFilter;
 
 
@@ -94,7 +94,7 @@ public abstract class ReadCommand implements IReadCommand
     }
 
     // maybeGenerateRetryCommand is used to generate a retry for short reads
-    public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row row)
+    public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
     {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index d52826b..8a08a42 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.service.RepairCallback;
+import org.apache.cassandra.service.RowDataResolver;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -71,9 +71,9 @@ public class SliceFromReadCommand extends ReadCommand
     }
 
     @Override
-    public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row row)
+    public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
     {
-        int maxLiveColumns = handler.getMaxLiveCount();
+        int maxLiveColumns = resolver.getMaxLiveCount();
 
         int count = filter.count;
         assert maxLiveColumns <= count;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index 675f61c..63b7df3 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -28,11 +28,11 @@ import org.apache.cassandra.utils.WrappedRunnable;
 
 public class AsyncRepairCallback implements IAsyncCallback
 {
-    private final RowRepairResolver repairResolver;
+    private final RowDataResolver repairResolver;
     private final int blockfor;
     protected final AtomicInteger received = new AtomicInteger(0);
 
-    public AsyncRepairCallback(RowRepairResolver repairResolver, int blockfor)
+    public AsyncRepairCallback(RowDataResolver repairResolver, int blockfor)
     {
         this.repairResolver = repairResolver;
         this.blockfor = blockfor;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
index d125553..e1ae652 100644
--- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
+++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
@@ -46,11 +46,22 @@ public class DatacenterReadCallback<TMessage, TResolved> extends ReadCallback<TM
         }
     };
 
-    public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
+    public DatacenterReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
     {
         super(resolver, consistencyLevel, command, endpoints);
     }
 
+    protected DatacenterReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, List<InetAddress> endpoints)
+    {
+        super(resolver, consistencyLevel, blockfor, command, endpoints);
+    }
+
+    @Override
+    public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver)
+    {
+        return new DatacenterReadCallback(newResolver, consistencyLevel, blockfor, command, endpoints);
+    }
+
     @Override
     protected void sortForConsistencyLevel(List<InetAddress> endpoints)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 0d24fbf..1dfd01e 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -138,7 +138,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
         protected Row getReduced()
         {
             ColumnFamily resolved = versions.size() > 1
-                                  ? RowRepairResolver.resolveSuperset(versions)
+                                  ? RowDataResolver.resolveSuperset(versions)
                                   : versions.get(0);
             if (versions.size() < sources.size())
             {
@@ -154,7 +154,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
             }
             // resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet
             if (resolved != null)
-                repairResults.addAll(RowRepairResolver.scheduleRepairs(resolved, table, key, versions, versionSources));
+                repairResults.addAll(RowDataResolver.scheduleRepairs(resolved, table, key, versions, versionSources));
             versions.clear();
             versionSources.clear();
             return new Row(key, resolved);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8df2e10..e12859a 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -60,7 +60,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
     private final long startTime;
     protected final int blockfor;
     final List<InetAddress> endpoints;
-    private final IReadCommand command;
+    protected final IReadCommand command;
     protected final ConsistencyLevel consistencyLevel;
     protected final AtomicInteger received = new AtomicInteger(0);
 
@@ -75,11 +75,26 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
         this.startTime = System.currentTimeMillis();
         this.consistencyLevel = consistencyLevel;
         sortForConsistencyLevel(endpoints);
-        this.endpoints = resolver instanceof RowRepairResolver ? endpoints : filterEndpoints(endpoints);
+        this.endpoints = filterEndpoints(endpoints);
         if (logger.isTraceEnabled())
             logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ",")));
     }
 
+    protected ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, List<InetAddress> endpoints)
+    {
+        this.command = command;
+        this.blockfor = blockfor;
+        this.consistencyLevel = consistencyLevel;
+        this.resolver = resolver;
+        this.startTime = System.currentTimeMillis();
+        this.endpoints = endpoints;
+    }
+
+    public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver)
+    {
+        return new ReadCallback(newResolver, consistencyLevel, blockfor, command, endpoints);
+    }
+
     /**
      * Endpoints is already restricted to live replicas, sorted by snitch preference.  This is a hook for
      * DatacenterReadCallback to move local-DC replicas to the front of the list.  We need this both
@@ -209,17 +224,22 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
     {
         protected void runMayThrow() throws IOException
         {
+            // If the resolver is a RowDigestResolver, we need to do a full data read if there is a mismatch.
+            // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
+            // get a digest mismatch)
             try
             {
                 resolver.resolve();
             }
             catch (DigestMismatchException e)
             {
+                assert resolver instanceof RowDigestResolver;
+
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", e);
 
                 ReadCommand readCommand = (ReadCommand) command;
-                final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key, readCommand.filter());
+                final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter());
                 IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
 
                 MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RepairCallback.java b/src/java/org/apache/cassandra/service/RepairCallback.java
deleted file mode 100644
index 9388328..0000000
--- a/src/java/org/apache/cassandra/service/RepairCallback.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.SimpleCondition;
-
-public class RepairCallback implements IAsyncCallback
-{
-    public final RowRepairResolver resolver;
-    private final List<InetAddress> endpoints;
-    private final SimpleCondition condition = new SimpleCondition();
-    private final long startTime;
-    protected final AtomicInteger received = new AtomicInteger(0);
-
-    /**
-     * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
-     * it needs to achieve.  Repair on the other hand is happy to repair whoever replies within the timeout.
-     *
-     * (The other main difference of course is, this is only created once we know we have a digest
-     * mismatch, and we're going to do full-data reads from everyone -- that is, this is the final
-     * stage in the read process.)
-     */
-    public RepairCallback(RowRepairResolver resolver, List<InetAddress> endpoints)
-    {
-        this.resolver = resolver;
-        this.endpoints = endpoints;
-        this.startTime = System.currentTimeMillis();
-    }
-
-    public Row get() throws DigestMismatchException, IOException
-    {
-        long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime);
-        try
-        {
-            condition.await(timeout, TimeUnit.MILLISECONDS);
-        }
-        catch (InterruptedException ex)
-        {
-            throw new AssertionError(ex);
-        }
-
-        return received.get() > 1 ? resolver.resolve() : null;
-    }
-
-    public void response(MessageIn message)
-    {
-        resolver.preprocess(message);
-        if (received.incrementAndGet() == endpoints.size())
-            condition.signal();
-    }
-
-    public boolean isLatencyForSnitch()
-    {
-        return true;
-    }
-
-    public int getMaxLiveCount()
-    {
-        return resolver.getMaxLiveCount();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java
new file mode 100644
index 0000000..5545293
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+
+public class RowDataResolver extends AbstractRowResolver
+{
+    private int maxLiveCount = 0;
+    public List<IAsyncResult> repairResults = Collections.emptyList();
+    private final IDiskAtomFilter filter;
+
+    public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter)
+    {
+        super(key, table);
+        this.filter = qFilter;
+    }
+
+    /*
+    * This method handles the following scenario:
+    *
+    * there was a mismatch on the initial read, so we redid the digest requests
+    * as full data reads.  In this case we need to compute the most recent version
+    * of each column, and send diffs to out-of-date replicas.
+    */
+    public Row resolve() throws DigestMismatchException, IOException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("resolving " + replies.size() + " responses");
+        long startTime = System.currentTimeMillis();
+
+        ColumnFamily resolved;
+        if (replies.size() > 1)
+        {
+            List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size());
+            List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size());
+
+            for (MessageIn<ReadResponse> message : replies)
+            {
+                ReadResponse response = message.payload;
+                ColumnFamily cf = response.row().cf;
+                assert !response.isDigestQuery() : "Received digest response to repair read from " + message.from;
+                versions.add(cf);
+                endpoints.add(message.from);
+
+                // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
+                int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
+                if (liveCount > maxLiveCount)
+                    maxLiveCount = liveCount;
+            }
+
+            resolved = resolveSuperset(versions);
+            if (logger.isDebugEnabled())
+                logger.debug("versions merged");
+
+            // send updates to any replica that was missing part of the full row
+            // (resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet)
+            if (resolved != null)
+                repairResults = scheduleRepairs(resolved, table, key, versions, endpoints);
+        }
+        else
+        {
+            resolved = replies.iterator().next().payload.row().cf;
+        }
+
+        if (logger.isDebugEnabled())
+            logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+
+        return new Row(key, resolved);
+    }
+
+    /**
+     * For each row version, compare with resolved (the superset of all row versions);
+     * if it is missing anything, send a mutation to the endpoint it come from.
+     */
+    public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
+    {
+        List<IAsyncResult> results = new ArrayList<IAsyncResult>(versions.size());
+
+        for (int i = 0; i < versions.size(); i++)
+        {
+            ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
+            if (diffCf == null) // no repair needs to happen
+                continue;
+
+            // create and send the row mutation message based on the diff
+            RowMutation rowMutation = new RowMutation(table, key.key);
+            rowMutation.add(diffCf);
+            MessageOut repairMessage;
+            // use a separate verb here because we don't want these to be get the white glove hint-
+            // on-timeout behavior that a "real" mutation gets
+            repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR);
+            results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i)));
+        }
+
+        return results;
+    }
+
+    static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
+    {
+        assert Iterables.size(versions) > 0;
+
+        ColumnFamily resolved = null;
+        for (ColumnFamily cf : versions)
+        {
+            if (cf == null)
+                continue;
+
+            if (resolved == null)
+                resolved = cf.cloneMeShallow();
+            else
+                resolved.delete(cf);
+        }
+        if (resolved == null)
+            return null;
+
+        // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes.
+        // this will handle removing columns and subcolumns that are supressed by a row or
+        // supercolumn tombstone.
+        QueryFilter filter = new QueryFilter(null, new QueryPath(resolved.metadata().cfName), new IdentityQueryFilter());
+        List<CloseableIterator<IColumn>> iters = new ArrayList<CloseableIterator<IColumn>>();
+        for (ColumnFamily version : versions)
+        {
+            if (version == null)
+                continue;
+            iters.add(FBUtilities.closeableIterator(version.iterator()));
+        }
+        filter.collateColumns(resolved, iters, Integer.MIN_VALUE);
+        return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE);
+    }
+
+    public Row getData() throws IOException
+    {
+        return replies.iterator().next().payload.row();
+    }
+
+    public boolean isDataPresent()
+    {
+        return !replies.isEmpty();
+    }
+
+    public int getMaxLiveCount()
+    {
+        return maxLiveCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java
index e0e262b..eeccbeb 100644
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java
@@ -95,7 +95,7 @@ public class RowDigestResolver extends AbstractRowResolver
         // with the data response. If there is a mismatch then throw an exception so that read repair can happen.
         //
         // It's important to note that we do not consider the possibility of multiple data responses --
-        // that can only happen when we're doing the repair post-mismatch, and will be handled by RowRepairResolver.
+        // that can only happen when we're doing the repair post-mismatch, and will be handled by RowDataResolver.
         if (digest != null)
         {
             ByteBuffer digest2 = ColumnFamily.digest(data);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RowRepairResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowRepairResolver.java b/src/java/org/apache/cassandra/service/RowRepairResolver.java
deleted file mode 100644
index 21cf5ab..0000000
--- a/src/java/org/apache/cassandra/service/RowRepairResolver.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.net.IAsyncResult;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.IFilter;
-
-public class RowRepairResolver extends AbstractRowResolver
-{
-    private int maxLiveCount = 0;
-    public List<IAsyncResult> repairResults = Collections.emptyList();
-    private final IDiskAtomFilter filter;
-
-    public RowRepairResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter)
-    {
-        super(key, table);
-        this.filter = qFilter;
-    }
-
-    /*
-    * This method handles the following scenario:
-    *
-    * there was a mismatch on the initial read, so we redid the digest requests
-    * as full data reads.  In this case we need to compute the most recent version
-    * of each column, and send diffs to out-of-date replicas.
-    */
-    public Row resolve() throws DigestMismatchException, IOException
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("resolving " + replies.size() + " responses");
-        long startTime = System.currentTimeMillis();
-
-        ColumnFamily resolved;
-        if (replies.size() > 1)
-        {
-            List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size());
-            List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size());
-
-            for (MessageIn<ReadResponse> message : replies)
-            {
-                ReadResponse response = message.payload;
-                ColumnFamily cf = response.row().cf;
-                assert !response.isDigestQuery() : "Received digest response to repair read from " + message.from;
-                versions.add(cf);
-                endpoints.add(message.from);
-
-                // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
-                int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
-                if (liveCount > maxLiveCount)
-                    maxLiveCount = liveCount;
-            }
-
-            resolved = resolveSuperset(versions);
-            if (logger.isDebugEnabled())
-                logger.debug("versions merged");
-
-            // send updates to any replica that was missing part of the full row
-            // (resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet)
-            if (resolved != null)
-                repairResults = scheduleRepairs(resolved, table, key, versions, endpoints);
-        }
-        else
-        {
-            resolved = replies.iterator().next().payload.row().cf;
-        }
-
-        if (logger.isDebugEnabled())
-            logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
-
-        return new Row(key, resolved);
-    }
-
-    /**
-     * For each row version, compare with resolved (the superset of all row versions);
-     * if it is missing anything, send a mutation to the endpoint it come from.
-     */
-    public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
-    {
-        List<IAsyncResult> results = new ArrayList<IAsyncResult>(versions.size());
-
-        for (int i = 0; i < versions.size(); i++)
-        {
-            ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
-            if (diffCf == null) // no repair needs to happen
-                continue;
-
-            // create and send the row mutation message based on the diff
-            RowMutation rowMutation = new RowMutation(table, key.key);
-            rowMutation.add(diffCf);
-            MessageOut repairMessage;
-            // use a separate verb here because we don't want these to be get the white glove hint-
-            // on-timeout behavior that a "real" mutation gets
-            repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR);
-            results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i)));
-        }
-
-        return results;
-    }
-
-    static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
-    {
-        assert Iterables.size(versions) > 0;
-
-        ColumnFamily resolved = null;
-        for (ColumnFamily cf : versions)
-        {
-            if (cf == null)
-                continue;
-
-            if (resolved == null)
-                resolved = cf.cloneMeShallow();
-            else
-                resolved.delete(cf);
-        }
-        if (resolved == null)
-            return null;
-
-        // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes.
-        // this will handle removing columns and subcolumns that are supressed by a row or
-        // supercolumn tombstone.
-        QueryFilter filter = new QueryFilter(null, new QueryPath(resolved.metadata().cfName), new IdentityQueryFilter());
-        List<CloseableIterator<IColumn>> iters = new ArrayList<CloseableIterator<IColumn>>();
-        for (ColumnFamily version : versions)
-        {
-            if (version == null)
-                continue;
-            iters.add(FBUtilities.closeableIterator(version.iterator()));
-        }
-        filter.collateColumns(resolved, iters, Integer.MIN_VALUE);
-        return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE);
-    }
-
-    public Row getData() throws IOException
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean isDataPresent()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public int getMaxLiveCount()
-    {
-        return maxLiveCount;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index fe427af..0fb7ec4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -923,7 +923,7 @@ public class StorageProxy implements StorageProxyMBean
 
             // read results and make a second pass for any digest mismatches
             List<ReadCommand> repairCommands = null;
-            List<RepairCallback> repairResponseHandlers = null;
+            List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
             for (int i = 0; i < commands.size(); i++)
             {
                 ReadCallback<ReadResponse, Row> handler = readCallbacks[i];
@@ -946,13 +946,14 @@ public class StorageProxy implements StorageProxyMBean
                 catch (DigestMismatchException ex)
                 {
                     logger.debug("Digest mismatch: {}", ex.toString());
-                    RowRepairResolver resolver = new RowRepairResolver(command.table, command.key, command.filter());
-                    RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints);
+                    // Do a full data read to resolve the correct response (and repair node that need be)
+                    RowDataResolver resolver = new RowDataResolver(command.table, command.key, command.filter());
+                    ReadCallback<ReadResponse, Row> repairHandler = handler.withNewResolver(resolver);
 
                     if (repairCommands == null)
                     {
                         repairCommands = new ArrayList<ReadCommand>();
-                        repairResponseHandlers = new ArrayList<RepairCallback>();
+                        repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>();
                     }
                     repairCommands.add(command);
                     repairResponseHandlers.add(repairHandler);
@@ -974,7 +975,7 @@ public class StorageProxy implements StorageProxyMBean
                 for (int i = 0; i < repairCommands.size(); i++)
                 {
                     ReadCommand command = repairCommands.get(i);
-                    RepairCallback handler = repairResponseHandlers.get(i);
+                    ReadCallback<ReadResponse, Row> handler = repairResponseHandlers.get(i);
 
                     Row row;
                     try
@@ -986,11 +987,12 @@ public class StorageProxy implements StorageProxyMBean
                         throw new AssertionError(e); // full data requested from each node here, no digests should be sent
                     }
 
+                    RowDataResolver resolver = (RowDataResolver)handler.resolver;
                     try
                     {
                         // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
                         // behind on writes in case the out-of-sync row is read multiple times in quick succession
-                        FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
+                        FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
                     }
                     catch (TimeoutException e)
                     {
@@ -999,7 +1001,7 @@ public class StorageProxy implements StorageProxyMBean
                     }
 
                     // retry any potential short reads
-                    ReadCommand retryCommand = command.maybeGenerateRetryCommand(handler, row);
+                    ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row);
                     if (retryCommand != null)
                     {
                         logger.debug("Issuing retry for read command");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/test/unit/org/apache/cassandra/service/RowResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RowResolverTest.java b/test/unit/org/apache/cassandra/service/RowResolverTest.java
index 3c530f1..2cc7860 100644
--- a/test/unit/org/apache/cassandra/service/RowResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/RowResolverTest.java
@@ -46,7 +46,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
         cf2.addColumn(column("c1", "v2", 1));
 
-        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
         assertColumns(resolved, "c1");
         assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
         assertNull(ColumnFamily.diff(cf2, resolved));
@@ -61,7 +61,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
         cf2.addColumn(column("c2", "v2", 1));
 
-        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
         assertColumns(resolved, "c1", "c2");
         assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
         assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
@@ -73,7 +73,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
         cf2.addColumn(column("c2", "v2", 1));
 
-        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(null, cf2));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(null, cf2));
         assertColumns(resolved, "c2");
         assertColumns(ColumnFamily.diff(null, resolved), "c2");
         assertNull(ColumnFamily.diff(cf2, resolved));
@@ -85,7 +85,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
         cf1.addColumn(column("c1", "v1", 0));
 
-        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, null));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, null));
         assertColumns(resolved, "c1");
         assertNull(ColumnFamily.diff(cf1, resolved));
         assertColumns(ColumnFamily.diff(null, resolved), "c1");
@@ -94,7 +94,7 @@ public class RowResolverTest extends SchemaLoader
     @Test
     public void testResolveSupersetNullBoth()
     {
-        assertNull(RowRepairResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
+        assertNull(RowDataResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
     }
 
     @Test
@@ -107,7 +107,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
         cf2.delete(new DeletionInfo(1L, (int) (System.currentTimeMillis() / 1000)));
 
-        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
         // no columns in the cf
         assertColumns(resolved);
         assertTrue(resolved.isMarkedForDelete());
@@ -119,7 +119,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1");
         scf2.delete(new DeletionInfo(1L, (int) (System.currentTimeMillis() / 1000)));
 
-        ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2));
+        ColumnFamily superResolved = RowDataResolver.resolveSuperset(Arrays.asList(scf1, scf2));
         // no columns in the cf
         assertColumns(superResolved);
         assertTrue(superResolved.isMarkedForDelete());
@@ -138,7 +138,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1");
         scf2.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
 
-        ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2));
+        ColumnFamily superResolved = RowDataResolver.resolveSuperset(Arrays.asList(scf1, scf2));
         // no columns in the cf
         assertColumns(superResolved);
         assertTrue(superResolved.isMarkedForDelete());
@@ -165,7 +165,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf4 = ColumnFamily.create("Keyspace1", "Standard1");
         cf4.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
 
-        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4));
         // will have deleted marker and one column
         assertColumns(resolved, "two");
         assertColumn(resolved, "two", "B", 3);
@@ -188,7 +188,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily scf4 = ColumnFamily.create("Keyspace1", "Super1");
         scf4.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
 
-        ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2, scf3, scf4));
+        ColumnFamily superResolved = RowDataResolver.resolveSuperset(Arrays.asList(scf1, scf2, scf3, scf4));
         // will have deleted marker and two super cols
         assertColumns(superResolved, "super1", "super2");