You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/16 18:48:20 UTC

[1/2] git commit: Coverage analysis for low CL queries

Coverage analysis for low CL queries

patch by slebresne; reviewed by jbellis for CASSANDRA-4858


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52671128
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52671128
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52671128

Branch: refs/heads/trunk
Commit: 526711288c30a6824a8e29805af7a2b1d72b9129
Parents: 994d262
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 16 18:43:20 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 16 18:45:32 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/config/CFMetaData.java    |   12 +
 .../cassandra/config/DatabaseDescriptor.java       |   19 ++
 .../cassandra/config/ReadRepairDecision.java       |   23 ++
 .../org/apache/cassandra/db/ConsistencyLevel.java  |  174 ++++++++++++++-
 .../org/apache/cassandra/dht/AbstractBounds.java   |    2 +
 src/java/org/apache/cassandra/dht/Bounds.java      |    5 +
 .../org/apache/cassandra/dht/ExcludingBounds.java  |    5 +
 .../cassandra/dht/IncludingExcludingBounds.java    |    5 +
 src/java/org/apache/cassandra/dht/Range.java       |    5 +
 .../cassandra/locator/AbstractEndpointSnitch.java  |   24 ++
 .../locator/AbstractReplicationStrategy.java       |   28 ++-
 .../cassandra/locator/DynamicEndpointSnitch.java   |   30 +++
 .../apache/cassandra/locator/IEndpointSnitch.java  |    8 +-
 .../apache/cassandra/locator/LocalStrategy.java    |    1 +
 .../cassandra/locator/NetworkTopologyStrategy.java |    1 +
 .../locator/OldNetworkTopologyStrategy.java        |    1 +
 .../apache/cassandra/locator/SimpleStrategy.java   |    1 +
 .../service/AbstractWriteResponseHandler.java      |   38 ++-
 .../cassandra/service/DatacenterReadCallback.java  |  104 ---------
 .../DatacenterSyncWriteResponseHandler.java        |   45 +----
 .../service/DatacenterWriteResponseHandler.java    |   28 +--
 .../org/apache/cassandra/service/ReadCallback.java |   93 ++-------
 .../org/apache/cassandra/service/StorageProxy.java |  121 ++++++++---
 .../apache/cassandra/service/StorageService.java   |    6 +-
 .../cassandra/service/WriteResponseHandler.java    |   45 +---
 .../ReplicationStrategyEndpointCacheTest.java      |    2 +-
 .../cassandra/locator/SimpleStrategyTest.java      |   10 -
 28 files changed, 481 insertions(+), 356 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3e40b6..49b86b9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -37,6 +37,7 @@
  * Fix inserting empty maps (CASSANDRA-5141)
  * Don't remove tokens from System table for node we know (CASSANDRA-5121)
  * fix streaming progress report for compresed files (CASSANDRA-5130)
+ * Coverage analysis for low-CL queries (CASSANDRA-4858)
 Merged from 1.1:
  * Simplify CompressedRandomAccessReader to work around JDK FD bug (CASSANDRA-5088)
  * Improve handling a changing target throttle rate mid-compaction (CASSANDRA-5087)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 5f6c1fe..82d49a9 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -465,6 +465,18 @@ public final class CFMetaData
         return dcLocalReadRepairChance;
     }
 
+    public ReadRepairDecision newReadRepairDecision()
+    {
+        double chance = FBUtilities.threadLocalRandom().nextDouble();
+        if (getReadRepairChance() > chance)
+            return ReadRepairDecision.GLOBAL;
+
+        if (getDcLocalReadRepair() > chance)
+            return ReadRepairDecision.DC_LOCAL;
+
+        return ReadRepairDecision.NONE;
+    }
+
     public boolean getReplicateOnWrite()
     {
         return replicateOnWrite;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c9b2d1e..7c33203 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -88,6 +88,7 @@ public class DatabaseDescriptor
     private static IRowCacheProvider rowCacheProvider;
 
     private static String localDC;
+    private static Comparator<InetAddress> localComparator;
 
     /**
      * Inspect the classpath to find storage configuration file
@@ -338,6 +339,19 @@ public class DatabaseDescriptor
             EndpointSnitchInfo.create();
 
             localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
+            localComparator = new Comparator<InetAddress>()
+            {
+                public int compare(InetAddress endpoint1, InetAddress endpoint2)
+                {
+                    boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
+                    boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
+                    if (local1 && !local2)
+                        return -1;
+                    if (local2 && !local1)
+                        return 1;
+                    return 0;
+                }
+            };
 
             /* Request Scheduler setup */
             requestSchedulerOptions = conf.request_scheduler_options;
@@ -1252,6 +1266,11 @@ public class DatabaseDescriptor
         return localDC;
     }
 
+    public static Comparator<InetAddress> getLocalComparator()
+    {
+        return localComparator;
+    }
+
     public static Config.InternodeCompression internodeCompression()
     {
         return conf.internode_compression;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/config/ReadRepairDecision.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ReadRepairDecision.java b/src/java/org/apache/cassandra/config/ReadRepairDecision.java
new file mode 100644
index 0000000..1b4b648
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/ReadRepairDecision.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+public enum ReadRepairDecision
+{
+    NONE, GLOBAL, DC_LOCAL;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 53acc5a..56a7896 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -17,13 +17,27 @@
  */
 package org.apache.cassandra.db;
 
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ReadRepairDecision;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.transport.ProtocolException;
 
+
 public enum ConsistencyLevel
 {
     ANY         (0),
@@ -35,6 +49,8 @@ public enum ConsistencyLevel
     LOCAL_QUORUM(6),
     EACH_QUORUM (7);
 
+    private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class);
+
     // Used by the binary protocol
     public final int code;
     private static final ConsistencyLevel[] codeIdx;
@@ -64,9 +80,13 @@ public enum ConsistencyLevel
         return codeIdx[code];
     }
 
-    public int blockFor(String table)
+    private int localQuorumFor(Table table, String dc)
+    {
+        return (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1;
+    }
+
+    public int blockFor(Table table)
     {
-        NetworkTopologyStrategy strategy = null;
         switch (this)
         {
             case ONE:
@@ -78,23 +98,161 @@ public enum ConsistencyLevel
             case THREE:
                 return 3;
             case QUORUM:
-                return (Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1;
+                return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
             case ALL:
-                return Table.open(table).getReplicationStrategy().getReplicationFactor();
+                return table.getReplicationStrategy().getReplicationFactor();
             case LOCAL_QUORUM:
-                strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
-                return (strategy.getReplicationFactor(DatabaseDescriptor.getLocalDataCenter()) / 2) + 1;
+                return localQuorumFor(table, DatabaseDescriptor.getLocalDataCenter());
             case EACH_QUORUM:
-                strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
+                NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
                 int n = 0;
                 for (String dc : strategy.getDatacenters())
-                    n += (strategy.getReplicationFactor(dc) / 2) + 1;
+                    n += localQuorumFor(table, dc);
                 return n;
             default:
                 throw new UnsupportedOperationException("Invalid consistency level: " + toString());
         }
     }
 
+    private boolean isLocal(InetAddress endpoint)
+    {
+        return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
+    }
+
+    private int countLocalEndpoints(Iterable<InetAddress> liveEndpoints)
+    {
+        int count = 0;
+        for (InetAddress endpoint : liveEndpoints)
+            if (isLocal(endpoint))
+                count++;
+        return count;
+    }
+
+    private Map<String, Integer> countPerDCEndpoints(Table table, Iterable<InetAddress> liveEndpoints)
+    {
+        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
+
+        Map<String, Integer> dcEndpoints = new HashMap<String, Integer>();
+        for (String dc: strategy.getDatacenters())
+            dcEndpoints.put(dc, 0);
+
+        for (InetAddress endpoint : liveEndpoints)
+        {
+            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint);
+            dcEndpoints.put(dc, dcEndpoints.get(dc) + 1);
+        }
+        return dcEndpoints;
+    }
+
+    public List<InetAddress> filterForQuery(Table table, List<InetAddress> liveEndpoints)
+    {
+        return filterForQuery(table, liveEndpoints, ReadRepairDecision.NONE);
+    }
+
+    public List<InetAddress> filterForQuery(Table table, List<InetAddress> liveEndpoints, ReadRepairDecision readRepair)
+    {
+        /*
+         * Endpoints are expected to be restricted to live replicas, sorted by snitch preference.
+         * For LOCAL_QORUM, move local-DC replicas in front first as we need them there whether
+         * we do read repair (since the first replica gets the data read) or not (since we'll take
+         * the blockFor first ones).
+         */
+        if (this == LOCAL_QUORUM)
+            Collections.sort(liveEndpoints, DatabaseDescriptor.getLocalComparator());
+
+        switch (readRepair)
+        {
+            case NONE:
+                return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), blockFor(table)));
+            case GLOBAL:
+                return liveEndpoints;
+            case DC_LOCAL:
+                List<InetAddress> local = new ArrayList<InetAddress>();
+                List<InetAddress> other = new ArrayList<InetAddress>();
+                for (InetAddress add : liveEndpoints)
+                {
+                    if (isLocal(add))
+                        local.add(add);
+                    else
+                        other.add(add);
+                }
+                // check if blockfor more than we have localep's
+                int blockFor = blockFor(table);
+                if (local.size() < blockFor)
+                    local.addAll(other.subList(0, Math.min(blockFor - local.size(), other.size())));
+                return local;
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    public boolean isSufficientLiveNodes(Table table, Iterable<InetAddress> liveEndpoints)
+    {
+        switch (this)
+        {
+            case ANY:
+                // local hint is acceptable, and local node is always live
+                return true;
+            case LOCAL_QUORUM:
+                return countLocalEndpoints(liveEndpoints) >= blockFor(table);
+            case EACH_QUORUM:
+                for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+                {
+                    if (entry.getValue() < localQuorumFor(table, entry.getKey()))
+                        return false;
+                }
+                return true;
+            default:
+                return Iterables.size(liveEndpoints) >= blockFor(table);
+        }
+    }
+
+    public void assureSufficientLiveNodes(Table table, Iterable<InetAddress> liveEndpoints) throws UnavailableException
+    {
+        int blockFor = blockFor(table);
+        switch (this)
+        {
+            case ANY:
+                // local hint is acceptable, and local node is always live
+                break;
+            case LOCAL_QUORUM:
+                int localLive = countLocalEndpoints(liveEndpoints);
+                if (localLive < blockFor)
+                {
+                    if (logger.isDebugEnabled())
+                    {
+                        StringBuilder builder = new StringBuilder("Local replicas [");
+                        for (InetAddress endpoint : liveEndpoints)
+                        {
+                            if (isLocal(endpoint))
+                                builder.append(endpoint).append(",");
+                        }
+                        builder.append("] are insufficient to satisfy LOCAL_QUORUM requirement of ").append(blockFor).append(" live nodes in '").append(DatabaseDescriptor.getLocalDataCenter()).append("'");
+                        logger.debug(builder.toString());
+                    }
+                    throw new UnavailableException(this, blockFor, localLive);
+                }
+                break;
+            case EACH_QUORUM:
+                for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+                {
+                    int dcBlockFor = localQuorumFor(table, entry.getKey());
+                    int dcLive = entry.getValue();
+                    if (dcLive < dcBlockFor)
+                        throw new UnavailableException(this, dcBlockFor, dcLive);
+                }
+                break;
+            default:
+                int live = Iterables.size(liveEndpoints);
+                if (live < blockFor)
+                {
+                    logger.debug("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(liveEndpoints), blockFor);
+                    throw new UnavailableException(this, blockFor, live);
+                }
+                break;
+        }
+    }
+
     public void validateForRead(String table) throws InvalidRequestException
     {
         switch (this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index b5ce85d..efa886c 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -123,6 +123,8 @@ public abstract class AbstractBounds<T extends RingPosition> implements Serializ
      */
     public abstract AbstractBounds<Token> toTokenBounds();
 
+    public abstract AbstractBounds<T> withNewRight(T newRight);
+
     public static class AbstractBoundsSerializer implements IVersionedSerializer<AbstractBounds<?>>
     {
         public void serialize(AbstractBounds<?> range, DataOutput out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Bounds.java b/src/java/org/apache/cassandra/dht/Bounds.java
index bf579ae..fedc407 100644
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@ -115,4 +115,9 @@ public class Bounds<T extends RingPosition> extends AbstractBounds<T>
     {
         return (left instanceof RowPosition) ? new Bounds<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (Bounds<Token>)this;
     }
+
+    public AbstractBounds<T> withNewRight(T newRight)
+    {
+        return new Bounds<T>(left, newRight);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/dht/ExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ExcludingBounds.java b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
index 7823023..5f46015 100644
--- a/src/java/org/apache/cassandra/dht/ExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
@@ -104,4 +104,9 @@ public class ExcludingBounds<T extends RingPosition> extends AbstractBounds<T>
     {
         return (left instanceof RowPosition) ? new ExcludingBounds<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (ExcludingBounds<Token>)this;
     }
+
+    public AbstractBounds<T> withNewRight(T newRight)
+    {
+        return new ExcludingBounds<T>(left, newRight);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
index 0b1cb0b..561555b 100644
--- a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
@@ -103,4 +103,9 @@ public class IncludingExcludingBounds<T extends RingPosition> extends AbstractBo
     {
         return (left instanceof RowPosition) ? new IncludingExcludingBounds<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (IncludingExcludingBounds<Token>)this;
     }
+
+    public AbstractBounds<T> withNewRight(T newRight)
+    {
+        return new IncludingExcludingBounds<T>(left, newRight);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index a750ae2..99286fe 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -468,4 +468,9 @@ public class Range<T extends RingPosition> extends AbstractBounds<T> implements
     {
         return (left instanceof RowPosition) ? new Range<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (Range<Token>)this;
     }
+
+    public AbstractBounds<T> withNewRight(T newRight)
+    {
+        return new Range<T>(left, newRight);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
index 0976cc8..546d15e 100644
--- a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.locator;
 import java.net.InetAddress;
 import java.util.*;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+
 public abstract class AbstractEndpointSnitch implements IEndpointSnitch
 {
     public abstract int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
@@ -57,4 +59,26 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
     {
         // noop by default
     }
+
+    public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2)
+    {
+        // Querying remote DC is likely to be an order of magnitude slower than
+        // querying locally, so 2 queries to local nodes is likely to still be
+        // faster than 1 query involving remote ones
+        boolean mergedHasRemote = hasRemoteNode(merged);
+        return mergedHasRemote
+             ? hasRemoteNode(l1) || hasRemoteNode(l2)
+             : true;
+    }
+
+    private boolean hasRemoteNode(List<InetAddress> l)
+    {
+        String localDc = DatabaseDescriptor.getLocalDataCenter();
+        for (InetAddress ep : l)
+        {
+            if (!localDc.equals(getDatacenter(ep)))
+                return true;
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 3c30baa..4b54d94 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -21,12 +21,14 @@ import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.util.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.RingPosition;
@@ -46,22 +48,25 @@ public abstract class AbstractReplicationStrategy
 {
     private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
 
-    public final String table;
+    @VisibleForTesting
+    final String tableName;
+    private Table table;
     public final Map<String, String> configOptions;
     private final TokenMetadata tokenMetadata;
 
     public IEndpointSnitch snitch;
 
-    AbstractReplicationStrategy(String table, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
+    AbstractReplicationStrategy(String tableName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
     {
-        assert table != null;
+        assert tableName != null;
         assert snitch != null;
         assert tokenMetadata != null;
         this.tokenMetadata = tokenMetadata;
         this.snitch = snitch;
         this.tokenMetadata.register(this);
         this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
-        this.table = table;
+        this.tableName = tableName;
+        // lazy-initialize table itself since we don't create them until after the replication strategies
     }
 
     private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
@@ -120,13 +125,20 @@ public abstract class AbstractReplicationStrategy
         if (consistency_level == ConsistencyLevel.LOCAL_QUORUM)
         {
             // block for in this context will be localnodes block.
-            return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType);
+            return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
         }
         else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
         {
-            return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType);
+            return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
         }
-        return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType);
+        return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
+    }
+
+    private Table getTable()
+    {
+        if (table == null)
+            table = Table.open(tableName);
+        return table;
     }
 
     /**
@@ -266,7 +278,7 @@ public abstract class AbstractReplicationStrategy
         for (String key : configOptions.keySet())
         {
             if (!expectedOptions.contains(key))
-                throw new ConfigurationException(String.format("Unrecognized strategy option {%s} passed to %s for keyspace %s", key, getClass().getSimpleName(), table));
+                throw new ConfigurationException(String.format("Unrecognized strategy option {%s} passed to %s for keyspace %s", key, getClass().getSimpleName(), tableName));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 6d03391..40aad20 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -306,4 +306,34 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
         return StorageService.instance.getSeverity(FBUtilities.getBroadcastAddress());
     }
 
+    public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2)
+    {
+        if (!subsnitch.isWorthMergingForRangeQuery(merged, l1, l2))
+            return false;
+
+        // Make sure we return the subsnitch decision (i.e true if we're here) if we lack too much scores
+        double maxMerged = maxScore(merged);
+        double maxL1 = maxScore(l1);
+        double maxL2 = maxScore(l2);
+        if (maxMerged < 0 || maxL1 < 0 || maxL2 < 0)
+            return true;
+
+        return maxMerged < maxL1 + maxL2;
+    }
+
+    // Return the max score for the endpoint in the provided list, or -1.0 if no node have a score.
+    private double maxScore(List<InetAddress> endpoints)
+    {
+        double maxScore = -1.0;
+        for (InetAddress endpoint : endpoints)
+        {
+            Double score = scores.get(endpoint);
+            if (score == null)
+                continue;
+
+            if (score > maxScore)
+                maxScore = score;
+        }
+        return maxScore;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
index 5767d1e..690b84f 100644
--- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
@@ -58,4 +58,10 @@ public interface IEndpointSnitch
      * called after Gossiper instance exists immediately before it starts gossiping
      */
     public void gossiperStarting();
-}
\ No newline at end of file
+
+    /**
+     * Returns whether for a range query doing a query against merged is likely
+     * to be faster than 2 sequential queries, one against l1 followed by one against l2.
+     */
+    public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/LocalStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index 4aafa01..ab580f1 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.RingPosition;
 import org.apache.cassandra.dht.Token;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 6291da0..ad43197 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata.Topology;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
index f07328b..2e3e3e8 100644
--- a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/SimpleStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 5d8a723..17d171e 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 2ad2849..c792819 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -21,17 +21,31 @@ import java.net.InetAddress;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.utils.SimpleCondition;
 
 public abstract class AbstractWriteResponseHandler implements IAsyncCallback
 {
+    private static Predicate<InetAddress> isAlive = new Predicate<InetAddress>()
+    {
+        public boolean apply(InetAddress endpoint)
+        {
+            return FailureDetector.instance.isAlive(endpoint);
+        }
+    };
+
     private final SimpleCondition condition = new SimpleCondition();
+    protected final Table table;
     protected final long startTime;
     protected final Collection<InetAddress> naturalEndpoints;
     protected final ConsistencyLevel consistencyLevel;
@@ -43,14 +57,16 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
      * @param pendingEndpoints
      * @param callback A callback to be called when the write is successful.
      */
-    protected AbstractWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
+    protected AbstractWriteResponseHandler(Table table,
+                                           Collection<InetAddress> naturalEndpoints,
                                            Collection<InetAddress> pendingEndpoints,
                                            ConsistencyLevel consistencyLevel,
                                            Runnable callback,
                                            WriteType writeType)
     {
+        this.table = table;
         this.pendingEndpoints = pendingEndpoints;
-        startTime = System.currentTimeMillis();
+        this.startTime = System.currentTimeMillis();
         this.consistencyLevel = consistencyLevel;
         this.naturalEndpoints = naturalEndpoints;
         this.callback = callback;
@@ -72,22 +88,18 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
         }
 
         if (!success)
-            throw new WriteTimeoutException(writeType, consistencyLevel, ackCount(), blockFor());
+            throw new WriteTimeoutException(writeType, consistencyLevel, ackCount(), consistencyLevel.blockFor(table) + pendingEndpoints.size());
     }
 
     protected abstract int ackCount();
 
-    protected int blockFor()
-    {
-        return blockForCL() + pendingEndpoints.size();
-    }
-
-    protected abstract int blockForCL();
-
     /** null message means "response from local write" */
     public abstract void response(MessageIn msg);
 
-    public abstract void assureSufficientLiveNodes() throws UnavailableException;
+    public void assureSufficientLiveNodes() throws UnavailableException
+    {
+        consistencyLevel.assureSufficientLiveNodes(table, Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), isAlive));
+    }
 
     protected void signal()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
deleted file mode 100644
index e1ae652..0000000
--- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.thrift.protocol.TMessage;
-
-/**
- * Datacenter Quorum response handler blocks for a quorum of responses from the local DC
- */
-public class DatacenterReadCallback<TMessage, TResolved> extends ReadCallback<TMessage, TResolved>
-{
-    private static final Comparator<InetAddress> localComparator = new Comparator<InetAddress>()
-    {
-        public int compare(InetAddress endpoint1, InetAddress endpoint2)
-        {
-            boolean local1 = localdc.equals(snitch.getDatacenter(endpoint1));
-            boolean local2 = localdc.equals(snitch.getDatacenter(endpoint2));
-            if (local1 && !local2)
-                return -1;
-            if (local2 && !local1)
-                return 1;
-            return 0;
-        }
-    };
-
-    public DatacenterReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
-    {
-        super(resolver, consistencyLevel, command, endpoints);
-    }
-
-    protected DatacenterReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, List<InetAddress> endpoints)
-    {
-        super(resolver, consistencyLevel, blockfor, command, endpoints);
-    }
-
-    @Override
-    public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver)
-    {
-        return new DatacenterReadCallback(newResolver, consistencyLevel, blockfor, command, endpoints);
-    }
-
-    @Override
-    protected void sortForConsistencyLevel(List<InetAddress> endpoints)
-    {
-        Collections.sort(endpoints, localComparator);
-    }
-
-    @Override
-    protected boolean waitingFor(MessageIn message)
-    {
-        return localdc.equals(snitch.getDatacenter(message.from));
-    }
-
-    @Override
-    public void assureSufficientLiveNodes() throws UnavailableException
-    {
-        int localEndpoints = 0;
-        for (InetAddress endpoint : endpoints)
-        {
-            if (localdc.equals(snitch.getDatacenter(endpoint)))
-                localEndpoints++;
-        }
-
-        if (localEndpoints < blockfor)
-        {
-            if (logger.isDebugEnabled())
-            {
-                StringBuilder builder = new StringBuilder("Local replicas [");
-                for (InetAddress endpoint : endpoints)
-                {
-                    if (localdc.equals(snitch.getDatacenter(endpoint)))
-                        builder.append(endpoint).append(",");
-                }
-                builder.append("] are insufficient to satisfy LOCAL_QUORUM requirement of ").append(blockfor).append(" live nodes in '").append(localdc).append("'");
-                logger.debug(builder.toString());
-            }
-
-            throw new UnavailableException(consistencyLevel, blockfor, localEndpoints);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 995b5bf..55e833d 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -43,29 +43,21 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
 {
     private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 
-    private static final String localdc;
-    static
-    {
-        localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
-    }
-
-    private final String table;
     private final NetworkTopologyStrategy strategy;
     private final HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
 
     public DatacenterSyncWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
                                               Collection<InetAddress> pendingEndpoints,
                                               ConsistencyLevel consistencyLevel,
-                                              String table,
+                                              Table table,
                                               Runnable callback,
                                               WriteType writeType)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
+        super(table, naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
         assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
 
-        this.table = table;
-        strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
+        strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
 
         for (String dc : strategy.getDatacenters())
         {
@@ -77,7 +69,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
     public void response(MessageIn message)
     {
         String dataCenter = message == null
-                            ? localdc
+                            ? DatabaseDescriptor.getLocalDataCenter()
                             : snitch.getDatacenter(message.from);
 
         responses.get(dataCenter).getAndDecrement();
@@ -92,11 +84,6 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
         signal();
     }
 
-    protected int blockForCL()
-    {
-        return consistencyLevel.blockFor(table);
-    }
-
     protected int ackCount()
     {
         int n = 0;
@@ -109,30 +96,6 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
         return n;
     }
 
-    public void assureSufficientLiveNodes() throws UnavailableException
-    {
-        Map<String, AtomicInteger> dcEndpoints = new HashMap<String, AtomicInteger>();
-        for (String dc: strategy.getDatacenters())
-            dcEndpoints.put(dc, new AtomicInteger());
-
-        for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
-        {
-            if (FailureDetector.instance.isAlive(destination))
-            {
-                // figure out the destination dc
-                String destinationDC = snitch.getDatacenter(destination);
-                dcEndpoints.get(destinationDC).incrementAndGet();
-            }
-        }
-
-        // Throw exception if any of the DC doesn't have livenodes to accept write.
-        for (String dc: strategy.getDatacenters())
-        {
-            if (dcEndpoints.get(dc).get() < responses.get(dc).get())
-                throw new UnavailableException(consistencyLevel, responses.get(dc).get(), dcEndpoints.get(dc).get());
-        }
-    }
-
     public boolean isLatencyForSnitch()
     {
         return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index cc76231..3fa674f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.IEndpointSnitch;
@@ -38,16 +39,10 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
 {
     private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 
-    private static final String localdc;
-    static
-    {
-        localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
-    }
-
     public DatacenterWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
                                           Collection<InetAddress> pendingEndpoints,
                                           ConsistencyLevel consistencyLevel,
-                                          String table,
+                                          Table table,
                                           Runnable callback,
                                           WriteType writeType)
     {
@@ -58,27 +53,10 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
     @Override
     public void response(MessageIn message)
     {
-        if (message == null || localdc.equals(snitch.getDatacenter(message.from)))
+        if (message == null || DatabaseDescriptor.getLocalDataCenter().equals(snitch.getDatacenter(message.from)))
         {
             if (responses.decrementAndGet() == 0)
                 signal();
         }
     }
-
-    @Override
-    public void assureSufficientLiveNodes() throws UnavailableException
-    {
-        int liveNodes = 0;
-        for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
-        {
-            if (localdc.equals(snitch.getDatacenter(destination)) && FailureDetector.instance.isAlive(destination))
-                liveNodes++;
-        }
-
-        if (liveNodes < responses.get())
-        {
-            throw new UnavailableException(consistencyLevel, responses.get(), liveNodes);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index e12859a..f1ca96e 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -34,9 +34,9 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -46,43 +46,34 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-import com.google.common.collect.Lists;
-
 public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessage>
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
-    protected static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-    protected static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
-
     public final IResponseResolver<TMessage, TResolved> resolver;
-    protected final SimpleCondition condition = new SimpleCondition();
+    private final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
-    protected final int blockfor;
+    private final int blockfor;
     final List<InetAddress> endpoints;
-    protected final IReadCommand command;
-    protected final ConsistencyLevel consistencyLevel;
-    protected final AtomicInteger received = new AtomicInteger(0);
+    private final IReadCommand command;
+    private final ConsistencyLevel consistencyLevel;
+    private final AtomicInteger received = new AtomicInteger(0);
+    private final Table table; // TODO push this into ConsistencyLevel?
 
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
+    public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> filteredEndpoints)
     {
-        this.command = command;
-        this.blockfor = consistencyLevel.blockFor(command.getKeyspace());
-        this.resolver = resolver;
-        this.startTime = System.currentTimeMillis();
-        this.consistencyLevel = consistencyLevel;
-        sortForConsistencyLevel(endpoints);
-        this.endpoints = filterEndpoints(endpoints);
+        this(resolver, consistencyLevel, consistencyLevel.blockFor(Table.open(command.getKeyspace())), command, Table.open(command.getKeyspace()), filteredEndpoints);
         if (logger.isTraceEnabled())
             logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ",")));
     }
 
-    protected ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, List<InetAddress> endpoints)
+    private ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, Table table, List<InetAddress> endpoints)
     {
         this.command = command;
+        this.table = table;
         this.blockfor = blockfor;
         this.consistencyLevel = consistencyLevel;
         this.resolver = resolver;
@@ -92,54 +83,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
 
     public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver)
     {
-        return new ReadCallback(newResolver, consistencyLevel, blockfor, command, endpoints);
-    }
-
-    /**
-     * Endpoints is already restricted to live replicas, sorted by snitch preference.  This is a hook for
-     * DatacenterReadCallback to move local-DC replicas to the front of the list.  We need this both
-     * when doing read repair (because the first replica gets the data read) and otherwise (because
-     * only the first 1..blockfor replicas will get digest reads).
-     */
-    protected void sortForConsistencyLevel(List<InetAddress> endpoints)
-    {
-        // no-op except in DRC
-    }
-
-    private List<InetAddress> filterEndpoints(List<InetAddress> ep)
-    {
-        if (resolver instanceof RowDigestResolver)
-        {
-            assert command instanceof ReadCommand : command;
-            String table = ((RowDigestResolver) resolver).table;
-            String columnFamily = ((ReadCommand) command).getColumnFamilyName();
-            CFMetaData cfmd = Schema.instance.getTableMetaData(table).get(columnFamily);
-            double chance = FBUtilities.threadLocalRandom().nextDouble();
-
-            // if global repair then just return all the ep's
-            if (cfmd.getReadRepairChance() > chance)
-                return ep;
-
-            // if local repair then just return localDC ep's
-            if (cfmd.getDcLocalReadRepair() > chance)
-            {
-                List<InetAddress> local = Lists.newArrayList();
-                List<InetAddress> other = Lists.newArrayList();
-                for (InetAddress add : ep)
-                {
-                    if (snitch.getDatacenter(add).equals(localdc))
-                        local.add(add);
-                    else
-                        other.add(add);
-                }
-                // check if blockfor more than we have localep's
-                if (local.size() < blockfor)
-                    local.addAll(other.subList(0, Math.min(blockfor - local.size(), other.size())));
-                return local;
-            }
-        }
-        // we don't read repair on range scans
-        return ep.subList(0, Math.min(ep.size(), blockfor));
+        return new ReadCallback(newResolver, consistencyLevel, blockfor, command, table, endpoints);
     }
 
     public TResolved get() throws ReadTimeoutException, DigestMismatchException, IOException
@@ -177,9 +121,11 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
     /**
      * @return true if the message counts towards the blockfor threshold
      */
-    protected boolean waitingFor(MessageIn message)
+    private boolean waitingFor(MessageIn message)
     {
-        return true;
+        return consistencyLevel == ConsistencyLevel.LOCAL_QUORUM
+             ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(message.from))
+             : true;
     }
 
     public void response(TMessage result)
@@ -207,12 +153,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
 
     public void assureSufficientLiveNodes() throws UnavailableException
     {
-        if (endpoints.size() < blockfor)
-        {
-            logger.debug("Live nodes {} do not satisfy ConsistencyLevel ({} required)",
-                         StringUtils.join(endpoints, ", "), blockfor);
-            throw new UnavailableException(consistencyLevel, blockfor, endpoints.size());
-        }
+        consistencyLevel.assureSufficientLiveNodes(table, endpoints);
     }
 
     public boolean isLatencyForSnitch()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0fb7ec4..df09171 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
@@ -310,7 +311,7 @@ public class StorageProxy implements StorageProxyMBean
         AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
                                                                         Collections.<InetAddress>emptyList(),
                                                                         ConsistencyLevel.ONE,
-                                                                        Table.SYSTEM_KS,
+                                                                        Table.open(Table.SYSTEM_KS),
                                                                         null,
                                                                         WriteType.BATCH_LOG);
         updateBatchlog(rm, endpoints, handler);
@@ -321,7 +322,12 @@ public class StorageProxy implements StorageProxyMBean
     {
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
         rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
-        AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ANY, Table.SYSTEM_KS, null, WriteType.SIMPLE);
+        AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
+                                                                        Collections.<InetAddress>emptyList(),
+                                                                        ConsistencyLevel.ANY,
+                                                                        Table.open(Table.SYSTEM_KS),
+                                                                        null,
+                                                                        WriteType.SIMPLE);
         updateBatchlog(rm, endpoints, handler);
     }
 
@@ -716,8 +722,9 @@ public class StorageProxy implements StorageProxyMBean
      * is unclear we want to mix those latencies with read latencies, so this
      * may be a bit involved.
      */
-    private static InetAddress findSuitableEndpoint(String table, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
+    private static InetAddress findSuitableEndpoint(String tableName, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
     {
+        Table table = Table.open(tableName);
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
         List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(table, key);
         if (endpoints.isEmpty())
@@ -869,21 +876,22 @@ public class StorageProxy implements StorageProxyMBean
             for (int i = 0; i < commands.size(); i++)
             {
                 ReadCommand command = commands.get(i);
+                Table table = Table.open(command.getKeyspace());
                 assert !command.isDigestQuery();
                 logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
-                List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
-                                                                                              command.key);
-                DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), endpoints);
+                List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
+                CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
+                endpoints = consistency_level.filterForQuery(table, endpoints, cfm.newReadRepairDecision());
 
                 RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
-                ReadCallback<ReadResponse, Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
+                ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints);
                 handler.assureSufficientLiveNodes();
-                assert !handler.endpoints.isEmpty();
+                assert !endpoints.isEmpty();
                 readCallbacks[i] = handler;
 
                 // The data-request message is sent to dataPoint, the node that will actually get the data for us
-                InetAddress dataPoint = handler.endpoints.get(0);
+                InetAddress dataPoint = endpoints.get(0);
                 if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                 {
                     logger.trace("reading data locally");
@@ -895,14 +903,14 @@ public class StorageProxy implements StorageProxyMBean
                     MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
                 }
 
-                if (handler.endpoints.size() == 1)
+                if (endpoints.size() == 1)
                     continue;
 
                 // send the other endpoints a digest request
                 ReadCommand digestCommand = command.copy();
                 digestCommand.setDigestQuery(true);
                 MessageOut message = null;
-                for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
+                for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
                 {
                     if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                     {
@@ -996,7 +1004,7 @@ public class StorageProxy implements StorageProxyMBean
                     }
                     catch (TimeoutException e)
                     {
-                        int blockFor = consistency_level.blockFor(command.getKeyspace());
+                        int blockFor = consistency_level.blockFor(Table.open(command.getKeyspace()));
                         throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
                     }
 
@@ -1071,13 +1079,28 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    static <TMessage, TResolved> ReadCallback<TMessage, TResolved> getReadCallback(IResponseResolver<TMessage, TResolved> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
+    private static List<InetAddress> getLiveSortedEndpoints(Table table, ByteBuffer key)
     {
-        if (consistencyLevel == ConsistencyLevel.LOCAL_QUORUM || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
-        {
-            return new DatacenterReadCallback(resolver, consistencyLevel, command, endpoints);
-        }
-        return new ReadCallback(resolver, consistencyLevel, command, endpoints);
+        return getLiveSortedEndpoints(table, StorageService.instance.getPartitioner().decorateKey(key));
+    }
+
+    private static List<InetAddress> getLiveSortedEndpoints(Table table, RingPosition pos)
+    {
+        List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(table, pos);
+        DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
+        return liveEndpoints;
+    }
+
+    private static List<InetAddress> intersection(List<InetAddress> l1, List<InetAddress> l2)
+    {
+        // Note: we don't use Guava Sets.intersection() for 3 reasons:
+        //   1) retainAll would be inefficient if l1 and l2 are large but in practice both are the replicas for a range and
+        //   so will be very small (< RF). In that case, retainAll is in fact more efficient.
+        //   2) we do ultimately need a list so converting everything to sets don't make sense
+        //   3) l1 and l2 are sorted by proximity. The use of retainAll  maintain that sorting in the result, while using sets wouldn't.
+        List<InetAddress> inter = new ArrayList<InetAddress>(l1);
+        inter.retainAll(l2);
+        return inter;
     }
 
     public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level)
@@ -1086,6 +1109,8 @@ public class StorageProxy implements StorageProxyMBean
         Tracing.trace("Determining replicas to query");
         logger.trace("Command/ConsistencyLevel is {}/{}", command.toString(), consistency_level);
         long startTime = System.nanoTime();
+
+        Table table = Table.open(command.keyspace);
         List<Row> rows;
         // now scan until we have enough results
         try
@@ -1095,8 +1120,51 @@ public class StorageProxy implements StorageProxyMBean
             int cql3RowCount = 0;
             rows = new ArrayList<Row>();
             List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
-            for (AbstractBounds<RowPosition> range : ranges)
+            int i = 0;
+            AbstractBounds<RowPosition> nextRange = null;
+            List<InetAddress> nextEndpoints = null;
+            List<InetAddress> nextFilteredEndpoints = null;
+            while (i < ranges.size())
             {
+                AbstractBounds<RowPosition> range = nextRange == null
+                                                  ? ranges.get(i)
+                                                  : nextRange;
+                List<InetAddress> liveEndpoints = nextEndpoints == null
+                                                ? getLiveSortedEndpoints(table, range.right)
+                                                : nextEndpoints;
+                List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
+                                                    ? consistency_level.filterForQuery(table, liveEndpoints)
+                                                    : nextFilteredEndpoints;
+                ++i;
+
+                // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
+                // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
+                // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
+                while (i < ranges.size())
+                {
+                    nextRange = ranges.get(i);
+                    nextEndpoints = getLiveSortedEndpoints(table, nextRange.right);
+                    nextFilteredEndpoints = consistency_level.filterForQuery(table, liveEndpoints);
+
+                    List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
+
+                    // Check if there is enough endpoint for the merge to be possible.
+                    if (!consistency_level.isSufficientLiveNodes(table, merged))
+                        break;
+
+                    List<InetAddress> filteredMerged = consistency_level.filterForQuery(table, merged);
+
+                    // Estimate whether merging will be a win or not
+                    if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
+                        break;
+
+                    // If we get there, merge this range and the next one
+                    range = range.withNewRight(nextRange.right);
+                    liveEndpoints = merged;
+                    filteredEndpoints = filteredMerged;
+                    ++i;
+                }
+
                 RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
                                                                   command.column_family,
                                                                   command.super_column,
@@ -1107,16 +1175,13 @@ public class StorageProxy implements StorageProxyMBean
                                                                   command.countCQL3Rows,
                                                                   command.isPaging);
 
-                List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right);
-                DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
-
                 // collect replies and resolve according to consistency level
                 RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
-                ReadCallback<RangeSliceReply, Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
+                ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback(resolver, consistency_level, nodeCmd, filteredEndpoints);
                 handler.assureSufficientLiveNodes();
-                resolver.setSources(handler.endpoints);
-                if (handler.endpoints.size() == 1
-                    && handler.endpoints.get(0).equals(FBUtilities.getBroadcastAddress())
+                resolver.setSources(filteredEndpoints);
+                if (filteredEndpoints.size() == 1
+                    && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
                     && OPTIMIZE_LOCAL_REQUESTS)
                 {
                     logger.trace("reading data locally");
@@ -1125,7 +1190,7 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
-                    for (InetAddress endpoint : handler.endpoints)
+                    for (InetAddress endpoint : filteredEndpoints)
                     {
                         MessagingService.instance().sendRR(message, endpoint, handler);
                         logger.trace("reading {} from {}", nodeCmd, endpoint);
@@ -1147,7 +1212,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     logger.debug("Range slice timeout: {}", ex.toString());
                     // We actually got all response at that point
-                    int blockFor = consistency_level.blockFor(command.keyspace);
+                    int blockFor = consistency_level.blockFor(table);
                     throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
                 }
                 catch (DigestMismatchException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index eb62032..e57cfe4 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2491,14 +2491,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param key key for which we need to find the endpoint
      * @return the endpoint responsible for this key
      */
-    public List<InetAddress> getLiveNaturalEndpoints(String table, ByteBuffer key)
+    public List<InetAddress> getLiveNaturalEndpoints(Table table, ByteBuffer key)
     {
         return getLiveNaturalEndpoints(table, getPartitioner().decorateKey(key));
     }
 
-    public List<InetAddress> getLiveNaturalEndpoints(String table, RingPosition pos)
+    public List<InetAddress> getLiveNaturalEndpoints(Table table, RingPosition pos)
     {
-        List<InetAddress> endpoints = Table.open(table).getReplicationStrategy().getNaturalEndpoints(pos);
+        List<InetAddress> endpoints = table.getReplicationStrategy().getNaturalEndpoints(pos);
         List<InetAddress> liveEps = new ArrayList<InetAddress>(endpoints.size());
 
         for (InetAddress endpoint : endpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index ab7223a..1c229d5 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.MessageIn;
@@ -41,32 +42,26 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
     protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class);
 
     protected final AtomicInteger responses;
-    private final int blockFor;
 
     public WriteResponseHandler(Collection<InetAddress> writeEndpoints,
                                 Collection<InetAddress> pendingEndpoints,
                                 ConsistencyLevel consistencyLevel,
-                                String table,
+                                Table table,
                                 Runnable callback,
                                 WriteType writeType)
     {
-        super(writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
-        blockFor = consistencyLevel.blockFor(table);
-        responses = new AtomicInteger(blockFor);
+        super(table, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
+        responses = new AtomicInteger(consistencyLevel.blockFor(table));
     }
 
     public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback)
     {
-        super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL, callback, writeType);
-        blockFor = 1;
-        responses = new AtomicInteger(1);
+        this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType);
     }
 
     public WriteResponseHandler(InetAddress endpoint, WriteType writeType)
     {
-        super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL, null, writeType);
-        blockFor = 1;
-        responses = new AtomicInteger(1);
+        this(endpoint, writeType, null);
     }
 
     public void response(MessageIn m)
@@ -77,33 +72,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
 
     protected int ackCount()
     {
-        return blockFor - responses.get();
-    }
-
-    protected int blockForCL()
-    {
-        return blockFor;
-    }
-
-    public void assureSufficientLiveNodes() throws UnavailableException
-    {
-        if (consistencyLevel == ConsistencyLevel.ANY)
-        {
-            // local hint is acceptable, and local node is always live
-            return;
-        }
-
-        // count destinations that are part of the desired target set
-        int liveNodes = 0;
-        for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
-        {
-            if (FailureDetector.instance.isAlive(destination))
-                liveNodes++;
-        }
-        if (liveNodes < blockFor)
-        {
-            throw new UnavailableException(consistencyLevel, blockFor, liveNodes);
-        }
+        return consistencyLevel.blockFor(table) - responses.get();
     }
 
     public boolean isLatencyForSnitch()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
index bbb382b..8212468 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
@@ -164,7 +164,7 @@ public class ReplicationStrategyEndpointCacheTest extends SchemaLoader
     private AbstractReplicationStrategy getStrategyWithNewTokenMetadata(AbstractReplicationStrategy strategy, TokenMetadata newTmd) throws ConfigurationException
     {
         return AbstractReplicationStrategy.createReplicationStrategy(
-                strategy.table,
+                strategy.tableName,
                 strategy.getClass().getName(),
                 newTmd,
                 strategy.snitch,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
index 5f108ee..a457df8 100644
--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
@@ -165,16 +165,6 @@ public class SimpleStrategyTest extends SchemaLoader
         StorageServiceAccessor.setTokenMetadata(oldTmd);
     }
 
-    private AbstractReplicationStrategy getStrategyWithNewTokenMetadata(AbstractReplicationStrategy strategy, TokenMetadata newTmd) throws ConfigurationException
-    {
-        return AbstractReplicationStrategy.createReplicationStrategy(
-                strategy.table,
-                strategy.getClass().getName(),
-                newTmd,
-                strategy.snitch,
-                strategy.configOptions);
-    }
-
     private AbstractReplicationStrategy getStrategy(String table, TokenMetadata tmd) throws ConfigurationException
     {
         KSMetaData ksmd = Schema.instance.getKSMetaData(table);