You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:54 UTC

[02/18] cassandra git commit: Transient Replication and Cheap Quorums

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/StorageServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceTest.java b/test/unit/org/apache/cassandra/service/StorageServiceTest.java
new file mode 100644
index 0000000..9d5c324
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/StorageServiceTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaMultimap;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+
+import static org.junit.Assert.assertEquals;
+
+public class StorageServiceTest
+{
+    static InetAddressAndPort aAddress;
+    static InetAddressAndPort bAddress;
+    static InetAddressAndPort cAddress;
+    static InetAddressAndPort dAddress;
+    static InetAddressAndPort eAddress;
+
+    @BeforeClass
+    public static void setUpClass() throws Exception
+    {
+        aAddress = InetAddressAndPort.getByName("127.0.0.1");
+        bAddress = InetAddressAndPort.getByName("127.0.0.2");
+        cAddress = InetAddressAndPort.getByName("127.0.0.3");
+        dAddress = InetAddressAndPort.getByName("127.0.0.4");
+        eAddress = InetAddressAndPort.getByName("127.0.0.5");
+    }
+
+    private static final Token threeToken = new RandomPartitioner.BigIntegerToken("3");
+    private static final Token sixToken = new RandomPartitioner.BigIntegerToken("6");
+    private static final Token nineToken = new RandomPartitioner.BigIntegerToken("9");
+    private static final Token elevenToken = new RandomPartitioner.BigIntegerToken("11");
+    private static final Token oneToken = new RandomPartitioner.BigIntegerToken("1");
+
+    Range<Token> aRange = new Range<>(oneToken, threeToken);
+    Range<Token> bRange = new Range<>(threeToken, sixToken);
+    Range<Token> cRange = new Range<>(sixToken, nineToken);
+    Range<Token> dRange = new Range<>(nineToken, elevenToken);
+    Range<Token> eRange = new Range<>(elevenToken, oneToken);
+
+    @Before
+    public void setUp()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+        IEndpointSnitch snitch = new AbstractEndpointSnitch()
+        {
+            public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2)
+            {
+                return 0;
+            }
+
+            public String getRack(InetAddressAndPort endpoint)
+            {
+                return "R1";
+            }
+
+            public String getDatacenter(InetAddressAndPort endpoint)
+            {
+                return "DC1";
+            }
+        };
+
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+    }
+
+    private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd)
+    {
+        return new SimpleStrategy("MoveTransientTest",
+                                  tmd,
+                                  DatabaseDescriptor.getEndpointSnitch(),
+                                  com.google.common.collect.ImmutableMap.of("replication_factor", "3/1"));
+    }
+
+    public static <K, C extends ReplicaCollection<? extends C>>  void assertMultimapEqualsIgnoreOrder(ReplicaMultimap<K, C> a, ReplicaMultimap<K, C> b)
+    {
+        if (!a.keySet().equals(b.keySet()))
+            assertEquals(a, b);
+        for (K key : a.keySet())
+        {
+            C ac = a.get(key);
+            C bc = b.get(key);
+            if (ac.size() != bc.size())
+                assertEquals(a, b);
+            for (Replica r : ac)
+            {
+                if (!bc.contains(r))
+                    assertEquals(a, b);
+            }
+        }
+    }
+
+    @Test
+    public void testGetChangedReplicasForLeaving() throws Exception
+    {
+        TokenMetadata tmd = new TokenMetadata();
+        tmd.updateNormalToken(threeToken, aAddress);
+        tmd.updateNormalToken(sixToken, bAddress);
+        tmd.updateNormalToken(nineToken, cAddress);
+        tmd.updateNormalToken(elevenToken, dAddress);
+        tmd.updateNormalToken(oneToken, eAddress);
+
+        tmd.addLeavingEndpoint(aAddress);
+
+        AbstractReplicationStrategy strat = simpleStrategy(tmd);
+
+        EndpointsByReplica result = StorageService.getChangedReplicasForLeaving("StorageServiceTest", aAddress, tmd, strat);
+        System.out.println(result);
+        EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
+        expectedResult.put(new Replica(aAddress, aRange, true), new Replica(cAddress, new Range<>(oneToken, sixToken), true));
+        expectedResult.put(new Replica(aAddress, aRange, true), new Replica(dAddress, new Range<>(oneToken, sixToken), false));
+        expectedResult.put(new Replica(aAddress, eRange, true), new Replica(bAddress, eRange, true));
+        expectedResult.put(new Replica(aAddress, eRange, true), new Replica(cAddress, eRange, false));
+        expectedResult.put(new Replica(aAddress, dRange, false), new Replica(bAddress, dRange, false));
+        assertMultimapEqualsIgnoreOrder(result, expectedResult.asImmutableView());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
index f8567e8..cf1e06a 100644
--- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
@@ -20,12 +20,14 @@ package org.apache.cassandra.service;
 
 
 import java.net.InetAddress;
-import java.util.Collection;
-import java.util.List;
+import java.net.UnknownHostException;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Predicates;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -38,9 +40,13 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -49,12 +55,26 @@ public class WriteResponseHandlerTest
 {
     static Keyspace ks;
     static ColumnFamilyStore cfs;
-    static List<InetAddressAndPort> targets;
+    static EndpointsForToken targets;
+    static EndpointsForToken pending;
+
+    private static Replica full(String name)
+    {
+        try
+        {
+            return ReplicaUtils.full(InetAddressAndPort.getByName(name));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
 
     @BeforeClass
     public static void setUpClass() throws Throwable
     {
         SchemaLoader.loadSchema();
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
         // Register peers with expected DC for NetworkTopologyStrategy.
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
         metadata.clearUnsafe();
@@ -77,17 +97,12 @@ public class WriteResponseHandlerTest
                     return "datacenter2";
             }
 
-            public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress)
-            {
-                return null;
-            }
-
-            public void sortByProximity(InetAddressAndPort address, List<InetAddressAndPort> addresses)
+            public <C extends ReplicaCollection<? extends C>> C sortedByProximity(InetAddressAndPort address, C replicas)
             {
-
+                return replicas;
             }
 
-            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
             {
                 return 0;
             }
@@ -97,7 +112,7 @@ public class WriteResponseHandlerTest
 
             }
 
-            public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
+            public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2)
             {
                 return false;
             }
@@ -106,8 +121,10 @@ public class WriteResponseHandlerTest
         SchemaLoader.createKeyspace("Foo", KeyspaceParams.nts("datacenter1", 3, "datacenter2", 3), SchemaLoader.standardCFMD("Foo", "Bar"));
         ks = Keyspace.open("Foo");
         cfs = ks.getColumnFamilyStore("Bar");
-        targets = ImmutableList.of(InetAddressAndPort.getByName("127.1.0.255"), InetAddressAndPort.getByName("127.1.0.254"), InetAddressAndPort.getByName("127.1.0.253"),
-                                   InetAddressAndPort.getByName("127.2.0.255"), InetAddressAndPort.getByName("127.2.0.254"), InetAddressAndPort.getByName("127.2.0.253"));
+        targets = EndpointsForToken.of(DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0)),
+                                       full("127.1.0.255"), full("127.1.0.254"), full("127.1.0.253"),
+                                       full("127.2.0.255"), full("127.2.0.254"), full("127.2.0.253"));
+        pending = EndpointsForToken.empty(DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0)));
     }
 
     @Before
@@ -197,7 +214,6 @@ public class WriteResponseHandlerTest
     @Test
     public void failedIdealCLIncrementsStat() throws Throwable
     {
-
         AbstractWriteResponseHandler awr = createWriteResponseHandler(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.EACH_QUORUM);
 
         //Succeed in local DC
@@ -220,16 +236,12 @@ public class WriteResponseHandlerTest
 
     private static AbstractWriteResponseHandler createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal, long queryStartTime)
     {
-        return ks.getReplicationStrategy().getWriteResponseHandler(targets, ImmutableList.of(), cl, new Runnable() {
-            public void run()
-            {
-
-            }
-        }, WriteType.SIMPLE, queryStartTime, ideal);
+        return ks.getReplicationStrategy().getWriteResponseHandler(ReplicaLayout.forWriteWithDownNodes(ks, cl, targets.token(), targets, pending),
+                                                                   null, WriteType.SIMPLE, queryStartTime, ideal);
     }
 
     private static MessageIn createDummyMessage(int target)
     {
-        return MessageIn.create(targets.get(target), null, null,  null, 0, 0L);
+        return MessageIn.create(targets.get(target).endpoint(), null, null,  null, 0, 0L);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
new file mode 100644
index 0000000..c19e65e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.ReplicaUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE;
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.apache.cassandra.locator.ReplicaUtils.trans;
+
+public class WriteResponseHandlerTransientTest
+{
+    static Keyspace ks;
+    static ColumnFamilyStore cfs;
+
+    static final InetAddressAndPort EP1;
+    static final InetAddressAndPort EP2;
+    static final InetAddressAndPort EP3;
+    static final InetAddressAndPort EP4;
+    static final InetAddressAndPort EP5;
+    static final InetAddressAndPort EP6;
+
+    static final String DC1 = "datacenter1";
+    static final String DC2 = "datacenter2";
+    static Token dummy;
+    static
+    {
+        try
+        {
+            EP1 = InetAddressAndPort.getByName("127.1.0.1");
+            EP2 = InetAddressAndPort.getByName("127.1.0.2");
+            EP3 = InetAddressAndPort.getByName("127.1.0.3");
+            EP4 = InetAddressAndPort.getByName("127.2.0.4");
+            EP5 = InetAddressAndPort.getByName("127.2.0.5");
+            EP6 = InetAddressAndPort.getByName("127.2.0.6");
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    @BeforeClass
+    public static void setupClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
+        // Register peers with expected DC for NetworkTopologyStrategy.
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+        metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.1.0.1"));
+        metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.2.0.1"));
+
+        DatabaseDescriptor.setEndpointSnitch(new IEndpointSnitch()
+        {
+            public String getRack(InetAddressAndPort endpoint)
+            {
+                return null;
+            }
+
+            public String getDatacenter(InetAddressAndPort endpoint)
+            {
+                byte[] address = endpoint.address.getAddress();
+                if (address[1] == 1)
+                    return DC1;
+                else
+                    return DC2;
+            }
+
+            public <C extends ReplicaCollection<? extends C>> C sortedByProximity(InetAddressAndPort address, C unsortedAddress)
+            {
+                return unsortedAddress;
+            }
+
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
+            {
+                return 0;
+            }
+
+            public void gossiperStarting()
+            {
+
+            }
+
+            public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2)
+            {
+                return false;
+            }
+        });
+
+        DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.1.0.1"));
+        SchemaLoader.createKeyspace("ks", KeyspaceParams.nts(DC1, "3/1", DC2, "3/1"), SchemaLoader.standardCFMD("ks", "tbl"));
+        ks = Keyspace.open("ks");
+        cfs = ks.getColumnFamilyStore("tbl");
+        dummy = DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0));
+    }
+
+    @Ignore("Throws unavailable for quorum as written")
+    @Test
+    public void checkPendingReplicasAreNotFiltered()
+    {
+        EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), full(EP1), full(EP2), trans(EP3));
+        EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), full(EP4), full(EP5), trans(EP6));
+        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), 2, natural, pending, Predicates.alwaysTrue());
+
+        Assert.assertEquals(EndpointsForRange.of(full(EP4), full(EP5), trans(EP6)), replicaLayout.pending());
+    }
+
+    private static ReplicaLayout.ForToken expected(EndpointsForToken all, EndpointsForToken selected)
+    {
+        return new ReplicaLayout.ForToken(ks, ConsistencyLevel.QUORUM, dummy.getToken(), all, EndpointsForToken.empty(dummy.getToken()), selected);
+    }
+
+    private static ReplicaLayout.ForToken getSpeculationContext(EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate)
+    {
+        return ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), blockFor, replicas, EndpointsForToken.empty(dummy.getToken()), livePredicate);
+    }
+
+    private static void assertSpeculationReplicas(ReplicaLayout.ForToken expected, EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate)
+    {
+        ReplicaLayout.ForToken actual = getSpeculationContext(replicas, blockFor, livePredicate);
+        Assert.assertEquals(expected.natural(), actual.natural());
+        Assert.assertEquals(expected.selected(), actual.selected());
+    }
+
+    private static Predicate<InetAddressAndPort> dead(InetAddressAndPort... endpoints)
+    {
+        Set<InetAddressAndPort> deadSet = Sets.newHashSet(endpoints);
+        return ep -> !deadSet.contains(ep);
+    }
+
+    private static EndpointsForToken replicas(Replica... rr)
+    {
+        return EndpointsForToken.of(dummy.getToken(), rr);
+    }
+
+    @Ignore("Throws unavailable for quorum as written")
+    @Test
+    public void checkSpeculationContext()
+    {
+        EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3));
+        // in happy path, transient replica should be classified as a backup
+        assertSpeculationReplicas(expected(all,
+                                           replicas(full(EP1), full(EP2))),
+                                  replicas(full(EP1), full(EP2), trans(EP3)),
+                                  2, dead());
+
+        // if one of the full replicas is dead, they should all be in the initial contacts
+        assertSpeculationReplicas(expected(all,
+                                           replicas(full(EP1), trans(EP3))),
+                                  replicas(full(EP1), full(EP2), trans(EP3)),
+                                  2, dead(EP2));
+
+        // block only for 1 full replica, use transient as backups
+        assertSpeculationReplicas(expected(all,
+                                           replicas(full(EP1))),
+                                  replicas(full(EP1), full(EP2), trans(EP3)),
+                                  1, dead(EP2));
+    }
+
+    @Test (expected = UnavailableException.class)
+    public void noFullReplicas()
+    {
+        getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP1));
+    }
+
+    @Test (expected = UnavailableException.class)
+    public void notEnoughTransientReplicas()
+    {
+        getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP2, EP3));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
new file mode 100644
index 0000000..c6f2232
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringBoundary;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Base class for testing various components which deal with read responses
+ */
+@Ignore
+public abstract class AbstractReadResponseTest
+{
+    public static final String KEYSPACE1 = "DataResolverTest";
+    public static final String CF_STANDARD = "Standard1";
+    public static final String CF_COLLECTION = "Collection1";
+
+    public static Keyspace ks;
+    public static ColumnFamilyStore cfs;
+    public static ColumnFamilyStore cfs2;
+    public static TableMetadata cfm;
+    public static TableMetadata cfm2;
+    public static ColumnMetadata m;
+
+    public static DecoratedKey dk;
+    static int nowInSec;
+
+    static final InetAddressAndPort EP1;
+    static final InetAddressAndPort EP2;
+    static final InetAddressAndPort EP3;
+
+    static
+    {
+        try
+        {
+            EP1 = InetAddressAndPort.getByName("127.0.0.1");
+            EP2 = InetAddressAndPort.getByName("127.0.0.2");
+            EP3 = InetAddressAndPort.getByName("127.0.0.3");
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @BeforeClass
+    public static void setupClass() throws Throwable
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
+        TableMetadata.Builder builder1 =
+        TableMetadata.builder(KEYSPACE1, CF_STANDARD)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col1", AsciiType.instance)
+                     .addRegularColumn("c1", AsciiType.instance)
+                     .addRegularColumn("c2", AsciiType.instance)
+                     .addRegularColumn("one", AsciiType.instance)
+                     .addRegularColumn("two", AsciiType.instance);
+
+        TableMetadata.Builder builder2 =
+        TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
+                     .addPartitionKeyColumn("k", ByteType.instance)
+                     .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
+
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2);
+
+        ks = Keyspace.open(KEYSPACE1);
+        cfs = ks.getColumnFamilyStore(CF_STANDARD);
+        cfm = cfs.metadata();
+        cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
+        cfm2 = cfs2.metadata();
+        m = cfm2.getColumn(new ColumnIdentifier("m", false));
+    }
+
+    @Before
+    public void setUp() throws Exception
+    {
+        dk = Util.dk("key1");
+        nowInSec = FBUtilities.nowInSeconds();
+    }
+
+    static void assertPartitionsEqual(RowIterator l, RowIterator r)
+    {
+        try (RowIterator left = l; RowIterator right = r)
+        {
+            Assert.assertTrue(Util.sameContent(left, right));
+        }
+    }
+
+    static void assertPartitionsEqual(UnfilteredRowIterator left, UnfilteredRowIterator right)
+    {
+        Assert.assertTrue(Util.sameContent(left, right));
+    }
+
+    static void assertPartitionsEqual(UnfilteredPartitionIterator left, UnfilteredPartitionIterator right)
+    {
+        while (left.hasNext())
+        {
+            Assert.assertTrue(right.hasNext());
+            assertPartitionsEqual(left.next(), right.next());
+        }
+        Assert.assertFalse(right.hasNext());
+    }
+
+    static void assertPartitionsEqual(PartitionIterator l, PartitionIterator r)
+    {
+        try (PartitionIterator left = l; PartitionIterator right = r)
+        {
+            while (left.hasNext())
+            {
+                Assert.assertTrue(right.hasNext());
+                assertPartitionsEqual(left.next(), right.next());
+            }
+            Assert.assertFalse(right.hasNext());
+        }
+    }
+
+    static void consume(PartitionIterator i)
+    {
+        try (PartitionIterator iterator = i)
+        {
+            while (iterator.hasNext())
+            {
+                try (RowIterator rows = iterator.next())
+                {
+                    while (rows.hasNext())
+                        rows.next();
+                }
+            }
+        }
+    }
+
+    static PartitionIterator filter(UnfilteredPartitionIterator iter)
+    {
+        return UnfilteredPartitionIterators.filter(iter, nowInSec);
+    }
+
+    static DecoratedKey dk(String k)
+    {
+        return cfs.decorateKey(ByteBufferUtil.bytes(k));
+    }
+
+    static DecoratedKey dk(int k)
+    {
+        return dk(Integer.toString(k));
+    }
+
+    static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data, boolean digest)
+    {
+        ReadResponse response = digest ? ReadResponse.createDigestResponse(data, command) : ReadResponse.createDataResponse(data, command);
+        return MessageIn.create(from, response, Collections.emptyMap(), MessagingService.Verb.READ, MessagingService.current_version);
+    }
+
+    static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data)
+    {
+        return response(command, from, data, false);
+    }
+
+    public RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
+    {
+        return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
+    }
+
+    public RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
+    {
+        ClusteringBound startBound = rtBound(start, true, inclusiveStart);
+        ClusteringBound endBound = rtBound(end, false, inclusiveEnd);
+        return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
+    }
+
+    public ClusteringBound rtBound(Object value, boolean isStart, boolean inclusive)
+    {
+        ClusteringBound.Kind kind = isStart
+                                    ? (inclusive ? ClusteringPrefix.Kind.INCL_START_BOUND : ClusteringPrefix.Kind.EXCL_START_BOUND)
+                                    : (inclusive ? ClusteringPrefix.Kind.INCL_END_BOUND : ClusteringPrefix.Kind.EXCL_END_BOUND);
+
+        return ClusteringBound.create(kind, cfm.comparator.make(value).getRawValues());
+    }
+
+    public ClusteringBoundary rtBoundary(Object value, boolean inclusiveOnEnd)
+    {
+        ClusteringBound.Kind kind = inclusiveOnEnd
+                                    ? ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY
+                                    : ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY;
+        return ClusteringBoundary.create(kind, cfm.comparator.make(value).getRawValues());
+    }
+
+    public RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime)
+    {
+        return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime));
+    }
+
+    public RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2)
+    {
+        return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd),
+                                                new DeletionTime(markedForDeleteAt1, localDeletionTime1),
+                                                new DeletionTime(markedForDeleteAt2, localDeletionTime2));
+    }
+
+    public UnfilteredPartitionIterator fullPartitionDelete(TableMetadata table, DecoratedKey dk, long timestamp, int nowInSec)
+    {
+        return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(table, dk, timestamp, nowInSec).unfilteredIterator());
+    }
+
+    public UnfilteredPartitionIterator iter(PartitionUpdate update)
+    {
+        return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator());
+    }
+
+    public UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds)
+    {
+        SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator);
+        Collections.addAll(s, unfiltereds);
+        final Iterator<Unfiltered> iterator = s.iterator();
+
+        UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm,
+                                                                          key,
+                                                                          DeletionTime.LIVE,
+                                                                          cfm.regularAndStaticColumns(),
+                                                                          Rows.EMPTY_STATIC_ROW,
+                                                                          false,
+                                                                          EncodingStats.NO_STATS)
+        {
+            protected Unfiltered computeNext()
+            {
+                return iterator.hasNext() ? iterator.next() : endOfData();
+            }
+        };
+        return new SingletonUnfilteredPartitionIterator(rowIter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index ac8ed0b..abec25d 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -19,114 +19,101 @@ package org.apache.cassandra.service.reads;
 
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collections;
+import java.util.Iterator;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
-import org.junit.*;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
-import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.MutableDeletionInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.ByteType;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.service.reads.repair.TestableReadRepair;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.Util.assertClustering;
 import static org.apache.cassandra.Util.assertColumn;
 import static org.apache.cassandra.Util.assertColumns;
+import static org.apache.cassandra.db.ClusteringBound.Kind;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.apache.cassandra.db.ClusteringBound.Kind;
 
-public class DataResolverTest
+public class DataResolverTest extends AbstractReadResponseTest
 {
     public static final String KEYSPACE1 = "DataResolverTest";
     public static final String CF_STANDARD = "Standard1";
-    public static final String CF_COLLECTION = "Collection1";
-
-    // counter to generate the last byte of the respondent's address in a ReadResponse message
-    private int addressSuffix = 10;
-
-    private DecoratedKey dk;
-    private Keyspace ks;
-    private ColumnFamilyStore cfs;
-    private ColumnFamilyStore cfs2;
-    private TableMetadata cfm;
-    private TableMetadata cfm2;
-    private ColumnMetadata m;
-    private int nowInSec;
+
     private ReadCommand command;
     private TestableReadRepair readRepair;
 
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        DatabaseDescriptor.daemonInitialization();
-
-        TableMetadata.Builder builder1 =
-            TableMetadata.builder(KEYSPACE1, CF_STANDARD)
-                         .addPartitionKeyColumn("key", BytesType.instance)
-                         .addClusteringColumn("col1", AsciiType.instance)
-                         .addRegularColumn("c1", AsciiType.instance)
-                         .addRegularColumn("c2", AsciiType.instance)
-                         .addRegularColumn("one", AsciiType.instance)
-                         .addRegularColumn("two", AsciiType.instance);
-
-        TableMetadata.Builder builder2 =
-            TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
-                         .addPartitionKeyColumn("k", ByteType.instance)
-                         .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
-
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2);
-    }
-
     @Before
     public void setup()
     {
-        dk = Util.dk("key1");
-        ks = Keyspace.open(KEYSPACE1);
-        cfs = ks.getColumnFamilyStore(CF_STANDARD);
-        cfm = cfs.metadata();
-        cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
-        cfm2 = cfs2.metadata();
-        m = cfm2.getColumn(new ColumnIdentifier("m", false));
-
-        nowInSec = FBUtilities.nowInSeconds();
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
         readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
     }
 
-    @Test
-    public void testResolveNewerSingleRow() throws UnknownHostException
+    private static EndpointsForRange makeReplicas(int num)
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
-                                                                                                       .add("c1", "v1")
-                                                                                                       .buildUpdate())));
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
-                                                                                                       .add("c1", "v2")
-                                                                                                       .buildUpdate())));
+        EndpointsForRange.Builder replicas = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, num);
+        for (int i = 0; i < num; i++)
+        {
+            try
+            {
+                replicas.add(ReplicaUtils.full(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) })));
+            }
+            catch (UnknownHostException e)
+            {
+                throw new AssertionError(e);
+            }
+        }
+        return replicas.build();
+    }
+
+    @Test
+    public void testResolveNewerSingleRow()
+    {
+        EndpointsForRange replicas = makeReplicas(2);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+                                                                                                     .add("c1", "v1")
+                                                                                                     .buildUpdate()), false));
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+                                                                                                     .add("c1", "v2")
+                                                                                                     .buildUpdate()), false));
 
         try(PartitionIterator data = resolver.resolve())
         {
@@ -149,16 +136,17 @@ public class DataResolverTest
     @Test
     public void testResolveDisjointSingleRow()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
-                                                                                                       .add("c1", "v1")
-                                                                                                       .buildUpdate())));
+        EndpointsForRange replicas = makeReplicas(2);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+                                                                                                     .add("c1", "v1")
+                                                                                                     .buildUpdate())));
 
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
-                                                                                                       .add("c2", "v2")
-                                                                                                       .buildUpdate())));
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+                                                                                                     .add("c2", "v2")
+                                                                                                     .buildUpdate())));
 
         try(PartitionIterator data = resolver.resolve())
         {
@@ -185,15 +173,16 @@ public class DataResolverTest
     @Test
     public void testResolveDisjointMultipleRows() throws UnknownHostException
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
-                                                                                                       .add("c1", "v1")
-                                                                                                       .buildUpdate())));
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2")
-                                                                                                       .add("c2", "v2")
-                                                                                                       .buildUpdate())));
+        EndpointsForRange replicas = makeReplicas(2);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+                                                                                                     .add("c1", "v1")
+                                                                                                     .buildUpdate())));
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2")
+                                                                                                     .add("c2", "v2")
+                                                                                                     .buildUpdate())));
 
         try (PartitionIterator data = resolver.resolve())
         {
@@ -231,37 +220,38 @@ public class DataResolverTest
     @Test
     public void testResolveDisjointMultipleRowsWithRangeTombstones()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime(), readRepair);
+        EndpointsForRange replicas = makeReplicas(4);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
         RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
         PartitionUpdate update = new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
-                                                                                  .addRangeTombstone(tombstone2)
-                                                                                  .buildUpdate();
+                                                                            .addRangeTombstone(tombstone2)
+                                                                            .buildUpdate();
 
-        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
         UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
-                                                                                  .addRangeTombstone(tombstone2)
-                                                                                  .buildUpdate());
-        resolver.preprocess(readResponseMessage(peer1, iter1));
+                                                                                            .addRangeTombstone(tombstone2)
+                                                                                            .buildUpdate());
+        resolver.preprocess(response(command, peer1, iter1));
         // not covered by any range tombstone
-        InetAddressAndPort peer2 = peer();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
         UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("0")
-                                                                                  .add("c1", "v0")
-                                                                                  .buildUpdate());
-        resolver.preprocess(readResponseMessage(peer2, iter2));
+                                                                                            .add("c1", "v0")
+                                                                                            .buildUpdate());
+        resolver.preprocess(response(command, peer2, iter2));
         // covered by a range tombstone
-        InetAddressAndPort peer3 = peer();
+        InetAddressAndPort peer3 = replicas.get(2).endpoint();
         UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("10")
-                                                                                  .add("c2", "v1")
-                                                                                  .buildUpdate());
-        resolver.preprocess(readResponseMessage(peer3, iter3));
+                                                                                            .add("c2", "v1")
+                                                                                            .buildUpdate());
+        resolver.preprocess(response(command, peer3, iter3));
         // range covered by rt, but newer
-        InetAddressAndPort peer4 = peer();
+        InetAddressAndPort peer4 = replicas.get(3).endpoint();
         UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm, nowInSec, 2L, dk).clustering("3")
-                                                                                  .add("one", "A")
-                                                                                  .buildUpdate());
-        resolver.preprocess(readResponseMessage(peer4, iter4));
+                                                                                            .add("one", "A")
+                                                                                            .buildUpdate());
+        resolver.preprocess(response(command, peer4, iter4));
         try (PartitionIterator data = resolver.resolve())
         {
             try (RowIterator rows = data.next())
@@ -311,13 +301,14 @@ public class DataResolverTest
     @Test
     public void testResolveWithOneEmpty()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
-                                                                                                       .add("c2", "v2")
-                                                                                                       .buildUpdate())));
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm)));
+        EndpointsForRange replicas = makeReplicas(2);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+                                                                                                     .add("c2", "v2")
+                                                                                                     .buildUpdate())));
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        resolver.preprocess(response(command, peer2, EmptyIterators.unfilteredPartition(cfm)));
 
         try(PartitionIterator data = resolver.resolve())
         {
@@ -340,10 +331,11 @@ public class DataResolverTest
     @Test
     public void testResolveWithBothEmpty()
     {
+        EndpointsForRange replicas = makeReplicas(2);
         TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
-        resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm)));
-        resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm)));
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+        resolver.preprocess(response(command, replicas.get(0).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
+        resolver.preprocess(response(command, replicas.get(1).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
 
         try(PartitionIterator data = resolver.resolve())
         {
@@ -356,14 +348,15 @@ public class DataResolverTest
     @Test
     public void testResolveDeleted()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+        EndpointsForRange replicas = makeReplicas(2);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
         // one response with columns timestamped before a delete in another response
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
-                                                                                                       .add("one", "A")
-                                                                                                       .buildUpdate())));
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, fullPartitionDelete(cfm, dk, 1, nowInSec)));
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+                                                                                                     .add("one", "A")
+                                                                                                     .buildUpdate())));
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        resolver.preprocess(response(command, peer2, fullPartitionDelete(cfm, dk, 1, nowInSec)));
 
         try (PartitionIterator data = resolver.resolve())
         {
@@ -381,23 +374,24 @@ public class DataResolverTest
     @Test
     public void testResolveMultipleDeleted()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime(), readRepair);
+        EndpointsForRange replicas = makeReplicas(4);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
         // deletes and columns with interleaved timestamp, with out of order return sequence
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        resolver.preprocess(response(command, peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
         // these columns created after the previous deletion
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
-                                                                                                       .add("one", "A")
-                                                                                                       .add("two", "A")
-                                                                                                       .buildUpdate())));
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+                                                                                                     .add("one", "A")
+                                                                                                     .add("two", "A")
+                                                                                                     .buildUpdate())));
         //this column created after the next delete
-        InetAddressAndPort peer3 = peer();
-        resolver.preprocess(readResponseMessage(peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1")
-                                                                                                       .add("two", "B")
-                                                                                                       .buildUpdate())));
-        InetAddressAndPort peer4 = peer();
-        resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
+        InetAddressAndPort peer3 = replicas.get(2).endpoint();
+        resolver.preprocess(response(command, peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1")
+                                                                                                     .add("two", "B")
+                                                                                                     .buildUpdate())));
+        InetAddressAndPort peer4 = replicas.get(3).endpoint();
+        resolver.preprocess(response(command, peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
 
         try(PartitionIterator data = resolver.resolve())
         {
@@ -465,9 +459,10 @@ public class DataResolverTest
      */
     private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
-        InetAddressAndPort peer1 = peer();
-        InetAddressAndPort peer2 = peer();
+        EndpointsForRange replicas = makeReplicas(2);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 
         // 1st "stream"
         RangeTombstone one_two    = tombstone("1", true , "2", false, timestamp1, nowInSec);
@@ -485,8 +480,8 @@ public class DataResolverTest
                                                                                             .addRangeTombstone(four_five)
                                                                                             .buildUpdate());
 
-        resolver.preprocess(readResponseMessage(peer1, iter1));
-        resolver.preprocess(readResponseMessage(peer2, iter2));
+        resolver.preprocess(response(command, peer1, iter1));
+        resolver.preprocess(response(command, peer2, iter2));
 
         // No results, we've only reconciled tombstones.
         try (PartitionIterator data = resolver.resolve())
@@ -538,9 +533,10 @@ public class DataResolverTest
      */
     private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
-        InetAddressAndPort peer1 = peer();
-        InetAddressAndPort peer2 = peer();
+        EndpointsForRange replicas = makeReplicas(2);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 
         // 1st "stream"
         RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec);
@@ -554,8 +550,8 @@ public class DataResolverTest
         RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec);
         UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine);
 
-        resolver.preprocess(readResponseMessage(peer1, iter1));
-        resolver.preprocess(readResponseMessage(peer2, iter2));
+        resolver.preprocess(response(command, peer1, iter1));
+        resolver.preprocess(response(command, peer2, iter2));
 
         boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3;
 
@@ -589,9 +585,10 @@ public class DataResolverTest
     @Test
     public void testRepairRangeTombstoneWithPartitionDeletion()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
-        InetAddressAndPort peer1 = peer();
-        InetAddressAndPort peer2 = peer();
+        EndpointsForRange replicas = makeReplicas(2);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 
         // 1st "stream": just a partition deletion
         UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec));
@@ -602,8 +599,8 @@ public class DataResolverTest
                                                  .addRangeTombstone(rt)
                                                  .buildUpdate());
 
-        resolver.preprocess(readResponseMessage(peer1, iter1));
-        resolver.preprocess(readResponseMessage(peer2, iter2));
+        resolver.preprocess(response(command, peer1, iter1));
+        resolver.preprocess(response(command, peer2, iter2));
 
         // No results, we've only reconciled tombstones.
         try (PartitionIterator data = resolver.resolve())
@@ -627,15 +624,16 @@ public class DataResolverTest
     @Test
     public void testRepairRangeTombstoneWithPartitionDeletion2()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
-        InetAddressAndPort peer1 = peer();
-        InetAddressAndPort peer2 = peer();
+        EndpointsForRange replicas = makeReplicas(2);
+        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 
         // 1st "stream": a partition deletion and a range tombstone
         RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec);
         PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
-                                                 .addRangeTombstone(rt1)
-                                                 .buildUpdate();
+                               .addRangeTombstone(rt1)
+                               .buildUpdate();
         ((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec));
         UnfilteredPartitionIterator iter1 = iter(upd1);
 
@@ -647,8 +645,8 @@ public class DataResolverTest
                                                  .addRangeTombstone(rt3)
                                                  .buildUpdate());
 
-        resolver.preprocess(readResponseMessage(peer1, iter1));
-        resolver.preprocess(readResponseMessage(peer2, iter2));
+        resolver.preprocess(response(command, peer1, iter1));
+        resolver.preprocess(response(command, peer2, iter2));
 
         // No results, we've only reconciled tombstones.
         try (PartitionIterator data = resolver.resolve())
@@ -678,8 +676,8 @@ public class DataResolverTest
         Slice slice = rt.deletedSlice();
         ClusteringBound newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start().getRawValues());
         return condition
-             ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
-             : rt;
+               ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
+               : rt;
     }
 
     // Forces the end to be exclusive if the condition holds
@@ -691,8 +689,8 @@ public class DataResolverTest
         Slice slice = rt.deletedSlice();
         ClusteringBound newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end().getRawValues());
         return condition
-             ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
-             : rt;
+               ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
+               : rt;
     }
 
     private static ByteBuffer bb(int b)
@@ -708,9 +706,10 @@ public class DataResolverTest
     @Test
     public void testResolveComplexDelete()
     {
+        EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
         TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+        DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
 
@@ -719,8 +718,8 @@ public class DataResolverTest
         builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
         builder.addCell(mapCell(0, 0, ts[0]));
 
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 
         builder.newRow(Clustering.EMPTY);
         DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
@@ -728,8 +727,8 @@ public class DataResolverTest
         Cell expectedCell = mapCell(1, 1, ts[1]);
         builder.addCell(expectedCell);
 
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 
         try(PartitionIterator data = resolver.resolve())
         {
@@ -742,7 +741,6 @@ public class DataResolverTest
             }
         }
 
-
         Mutation mutation = readRepair.getForEndpoint(peer1);
         Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
         assertTrue(rowIter.hasNext());
@@ -760,9 +758,10 @@ public class DataResolverTest
     @Test
     public void testResolveDeletedCollection()
     {
+        EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
         TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+        DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
 
@@ -771,15 +770,15 @@ public class DataResolverTest
         builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
         builder.addCell(mapCell(0, 0, ts[0]));
 
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 
         builder.newRow(Clustering.EMPTY);
         DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
         builder.addComplexDeletion(m, expectedCmplxDelete);
 
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 
         try(PartitionIterator data = resolver.resolve())
         {
@@ -803,9 +802,10 @@ public class DataResolverTest
     @Test
     public void testResolveNewCollection()
     {
+        EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
         TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+        DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
 
@@ -818,11 +818,11 @@ public class DataResolverTest
         builder.addCell(expectedCell);
 
         // empty map column
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
 
         try(PartitionIterator data = resolver.resolve())
         {
@@ -852,9 +852,10 @@ public class DataResolverTest
     @Test
     public void testResolveNewCollectionOverwritingDeleted()
     {
+        EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
         TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+        DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
 
@@ -863,8 +864,8 @@ public class DataResolverTest
         builder.newRow(Clustering.EMPTY);
         builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
 
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 
         // newer, overwritten map column
         builder.newRow(Clustering.EMPTY);
@@ -873,8 +874,8 @@ public class DataResolverTest
         Cell expectedCell = mapCell(1, 1, ts[1]);
         builder.addCell(expectedCell);
 
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 
         try(PartitionIterator data = resolver.resolve())
         {
@@ -897,18 +898,6 @@ public class DataResolverTest
         Assert.assertNull(readRepair.sent.get(peer2));
     }
 
-    private InetAddressAndPort peer()
-    {
-        try
-        {
-            return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
-        }
-        catch (UnknownHostException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
     private void assertRepairContainsDeletions(Mutation mutation,
                                                DeletionTime deletionTime,
                                                RangeTombstone...rangeTombstones)
@@ -961,91 +950,8 @@ public class DataResolverTest
         assertEquals(update.metadata().name, cfm.name);
     }
 
-
-    public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator)
-    {
-        return readResponseMessage(from, partitionIterator, command);
-
-    }
-    public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd)
+    private ReplicaLayout.ForRange plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel)
     {
-        return MessageIn.create(from,
-                                ReadResponse.createRemoteDataResponse(partitionIterator, cmd),
-                                Collections.EMPTY_MAP,
-                                MessagingService.Verb.REQUEST_RESPONSE,
-                                MessagingService.current_version);
-    }
-
-    private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
-    {
-        return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
-    }
-
-    private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
-    {
-        ClusteringBound startBound = rtBound(start, true, inclusiveStart);
-        ClusteringBound endBound = rtBound(end, false, inclusiveEnd);
-        return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
-    }
-
-    private ClusteringBound rtBound(Object value, boolean isStart, boolean inclusive)
-    {
-        ClusteringBound.Kind kind = isStart
-                                  ? (inclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND)
-                                  : (inclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND);
-
-        return ClusteringBound.create(kind, cfm.comparator.make(value).getRawValues());
-    }
-
-    private ClusteringBoundary rtBoundary(Object value, boolean inclusiveOnEnd)
-    {
-        ClusteringBound.Kind kind = inclusiveOnEnd
-                                  ? Kind.INCL_END_EXCL_START_BOUNDARY
-                                  : Kind.EXCL_END_INCL_START_BOUNDARY;
-        return ClusteringBoundary.create(kind, cfm.comparator.make(value).getRawValues());
-    }
-
-    private RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime)
-    {
-        return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime));
-    }
-
-    private RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2)
-    {
-        return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd),
-                                                new DeletionTime(markedForDeleteAt1, localDeletionTime1),
-                                                new DeletionTime(markedForDeleteAt2, localDeletionTime2));
-    }
-
-    private UnfilteredPartitionIterator fullPartitionDelete(TableMetadata table, DecoratedKey dk, long timestamp, int nowInSec)
-    {
-        return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(table, dk, timestamp, nowInSec).unfilteredIterator());
-    }
-
-    private UnfilteredPartitionIterator iter(PartitionUpdate update)
-    {
-        return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator());
-    }
-
-    private UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds)
-    {
-        SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator);
-        Collections.addAll(s, unfiltereds);
-        final Iterator<Unfiltered> iterator = s.iterator();
-
-        UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm,
-                                                                          key,
-                                                                          DeletionTime.LIVE,
-                                                                          cfm.regularAndStaticColumns(),
-                                                                          Rows.EMPTY_STATIC_ROW,
-                                                                          false,
-                                                                          EncodingStats.NO_STATS)
-        {
-            protected Unfiltered computeNext()
-            {
-                return iterator.hasNext() ? iterator.next() : endOfData();
-            }
-        };
-        return new SingletonUnfilteredPartitionIterator(rowIter);
+        return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas);
     }
-}
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org