You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2018/03/02 01:55:24 UTC

[2/5] cassandra git commit: Refactor read executor and response resolver, abstract read repair

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
deleted file mode 100644
index f406582..0000000
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ /dev/null
@@ -1,1122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
-import org.junit.*;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.ByteType;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static org.apache.cassandra.Util.assertClustering;
-import static org.apache.cassandra.Util.assertColumn;
-import static org.apache.cassandra.Util.assertColumns;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.apache.cassandra.db.ClusteringBound.Kind;
-
-public class DataResolverTest
-{
-    public static final String KEYSPACE1 = "DataResolverTest";
-    public static final String CF_STANDARD = "Standard1";
-    public static final String CF_COLLECTION = "Collection1";
-
-    // counter to generate the last byte of the respondent's address in a ReadResponse message
-    private int addressSuffix = 10;
-
-    private DecoratedKey dk;
-    private Keyspace ks;
-    private ColumnFamilyStore cfs;
-    private ColumnFamilyStore cfs2;
-    private TableMetadata cfm;
-    private TableMetadata cfm2;
-    private ColumnMetadata m;
-    private int nowInSec;
-    private ReadCommand command;
-    private MessageRecorder messageRecorder;
-
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        DatabaseDescriptor.daemonInitialization();
-
-        TableMetadata.Builder builder1 =
-            TableMetadata.builder(KEYSPACE1, CF_STANDARD)
-                         .addPartitionKeyColumn("key", BytesType.instance)
-                         .addClusteringColumn("col1", AsciiType.instance)
-                         .addRegularColumn("c1", AsciiType.instance)
-                         .addRegularColumn("c2", AsciiType.instance)
-                         .addRegularColumn("one", AsciiType.instance)
-                         .addRegularColumn("two", AsciiType.instance);
-
-        TableMetadata.Builder builder2 =
-            TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
-                         .addPartitionKeyColumn("k", ByteType.instance)
-                         .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
-
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2);
-    }
-
-    @Before
-    public void setup()
-    {
-        dk = Util.dk("key1");
-        ks = Keyspace.open(KEYSPACE1);
-        cfs = ks.getColumnFamilyStore(CF_STANDARD);
-        cfm = cfs.metadata();
-        cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
-        cfm2 = cfs2.metadata();
-        m = cfm2.getColumn(new ColumnIdentifier("m", false));
-
-        nowInSec = FBUtilities.nowInSeconds();
-        command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
-    }
-
-    @Before
-    public void injectMessageSink()
-    {
-        // install an IMessageSink to capture all messages
-        // so we can inspect them during tests
-        messageRecorder = new MessageRecorder();
-        MessagingService.instance().addMessageSink(messageRecorder);
-    }
-
-    @After
-    public void removeMessageSink()
-    {
-        // should be unnecessary, but good housekeeping
-        MessagingService.instance().clearMessageSinks();
-    }
-
-    /**
-     * Checks that the provided data resolver has the expected number of repair futures created.
-     * This method also "release" those future by faking replica responses to those repair, which is necessary or
-     * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures.
-     */
-    private void assertRepairFuture(DataResolver resolver, int expectedRepairs)
-    {
-        assertEquals(expectedRepairs, resolver.repairResults.size());
-
-        // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want
-        // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload).
-        for (AsyncOneResponse<?> future : resolver.repairResults)
-            future.response(MessageIn.create(null, null, null, null, -1));
-    }
-
-    @Test
-    public void testResolveNewerSingleRow() throws UnknownHostException
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
-                                                                                                       .add("c1", "v1")
-                                                                                                       .buildUpdate())));
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
-                                                                                                       .add("c1", "v2")
-                                                                                                       .buildUpdate())));
-
-        try(PartitionIterator data = resolver.resolve())
-        {
-            try (RowIterator rows = Iterators.getOnlyElement(data))
-            {
-                Row row = Iterators.getOnlyElement(rows);
-                assertColumns(row, "c1");
-                assertColumn(cfm, row, "c1", "v2", 1);
-            }
-            assertRepairFuture(resolver, 1);
-        }
-
-        assertEquals(1, messageRecorder.sent.size());
-        // peer 1 just needs to repair with the row from peer 2
-        MessageOut msg = getSentMessage(peer1);
-        assertRepairMetadata(msg);
-        assertRepairContainsNoDeletions(msg);
-        assertRepairContainsColumn(msg, "1", "c1", "v2", 1);
-    }
-
-    @Test
-    public void testResolveDisjointSingleRow()
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
-                                                                                                       .add("c1", "v1")
-                                                                                                       .buildUpdate())));
-
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
-                                                                                                       .add("c2", "v2")
-                                                                                                       .buildUpdate())));
-
-        try(PartitionIterator data = resolver.resolve())
-        {
-            try (RowIterator rows = Iterators.getOnlyElement(data))
-            {
-                Row row = Iterators.getOnlyElement(rows);
-                assertColumns(row, "c1", "c2");
-                assertColumn(cfm, row, "c1", "v1", 0);
-                assertColumn(cfm, row, "c2", "v2", 1);
-            }
-            assertRepairFuture(resolver, 2);
-        }
-
-        assertEquals(2, messageRecorder.sent.size());
-        // each peer needs to repair with each other's column
-        MessageOut msg = getSentMessage(peer1);
-        assertRepairMetadata(msg);
-        assertRepairContainsColumn(msg, "1", "c2", "v2", 1);
-
-        msg = getSentMessage(peer2);
-        assertRepairMetadata(msg);
-        assertRepairContainsColumn(msg, "1", "c1", "v1", 0);
-    }
-
-    @Test
-    public void testResolveDisjointMultipleRows() throws UnknownHostException
-    {
-
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
-                                                                                                       .add("c1", "v1")
-                                                                                                       .buildUpdate())));
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2")
-                                                                                                       .add("c2", "v2")
-                                                                                                       .buildUpdate())));
-
-        try (PartitionIterator data = resolver.resolve())
-        {
-            try (RowIterator rows = data.next())
-            {
-                // We expect the resolved superset to contain both rows
-                Row row = rows.next();
-                assertClustering(cfm, row, "1");
-                assertColumns(row, "c1");
-                assertColumn(cfm, row, "c1", "v1", 0);
-
-                row = rows.next();
-                assertClustering(cfm, row, "2");
-                assertColumns(row, "c2");
-                assertColumn(cfm, row, "c2", "v2", 1);
-
-                assertFalse(rows.hasNext());
-                assertFalse(data.hasNext());
-            }
-            assertRepairFuture(resolver, 2);
-        }
-
-        assertEquals(2, messageRecorder.sent.size());
-        // each peer needs to repair the row from the other
-        MessageOut msg = getSentMessage(peer1);
-        assertRepairMetadata(msg);
-        assertRepairContainsNoDeletions(msg);
-        assertRepairContainsColumn(msg, "2", "c2", "v2", 1);
-
-        msg = getSentMessage(peer2);
-        assertRepairMetadata(msg);
-        assertRepairContainsNoDeletions(msg);
-        assertRepairContainsColumn(msg, "1", "c1", "v1", 0);
-    }
-
-    @Test
-    public void testResolveDisjointMultipleRowsWithRangeTombstones()
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime());
-
-        RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
-        RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
-        PartitionUpdate update = new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
-                                                                                  .addRangeTombstone(tombstone2)
-                                                                                  .buildUpdate();
-
-        InetAddressAndPort peer1 = peer();
-        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
-                                                                                  .addRangeTombstone(tombstone2)
-                                                                                  .buildUpdate());
-        resolver.preprocess(readResponseMessage(peer1, iter1));
-        // not covered by any range tombstone
-        InetAddressAndPort peer2 = peer();
-        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("0")
-                                                                                  .add("c1", "v0")
-                                                                                  .buildUpdate());
-        resolver.preprocess(readResponseMessage(peer2, iter2));
-        // covered by a range tombstone
-        InetAddressAndPort peer3 = peer();
-        UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("10")
-                                                                                  .add("c2", "v1")
-                                                                                  .buildUpdate());
-        resolver.preprocess(readResponseMessage(peer3, iter3));
-        // range covered by rt, but newer
-        InetAddressAndPort peer4 = peer();
-        UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm, nowInSec, 2L, dk).clustering("3")
-                                                                                  .add("one", "A")
-                                                                                  .buildUpdate());
-        resolver.preprocess(readResponseMessage(peer4, iter4));
-        try (PartitionIterator data = resolver.resolve())
-        {
-            try (RowIterator rows = data.next())
-            {
-                Row row = rows.next();
-                assertClustering(cfm, row, "0");
-                assertColumns(row, "c1");
-                assertColumn(cfm, row, "c1", "v0", 0);
-
-                row = rows.next();
-                assertClustering(cfm, row, "3");
-                assertColumns(row, "one");
-                assertColumn(cfm, row, "one", "A", 2);
-
-                assertFalse(rows.hasNext());
-            }
-            assertRepairFuture(resolver, 4);
-        }
-
-        assertEquals(4, messageRecorder.sent.size());
-        // peer1 needs the rows from peers 2 and 4
-        MessageOut msg = getSentMessage(peer1);
-        assertRepairMetadata(msg);
-        assertRepairContainsNoDeletions(msg);
-        assertRepairContainsColumn(msg, "0", "c1", "v0", 0);
-        assertRepairContainsColumn(msg, "3", "one", "A", 2);
-
-        // peer2 needs to get the row from peer4 and the RTs
-        msg = getSentMessage(peer2);
-        assertRepairMetadata(msg);
-        assertRepairContainsDeletions(msg, null, tombstone1, tombstone2);
-        assertRepairContainsColumn(msg, "3", "one", "A", 2);
-
-        // peer 3 needs both rows and the RTs
-        msg = getSentMessage(peer3);
-        assertRepairMetadata(msg);
-        assertRepairContainsDeletions(msg, null, tombstone1, tombstone2);
-        assertRepairContainsColumn(msg, "0", "c1", "v0", 0);
-        assertRepairContainsColumn(msg, "3", "one", "A", 2);
-
-        // peer4 needs the row from peer2  and the RTs
-        msg = getSentMessage(peer4);
-        assertRepairMetadata(msg);
-        assertRepairContainsDeletions(msg, null, tombstone1, tombstone2);
-        assertRepairContainsColumn(msg, "0", "c1", "v0", 0);
-    }
-
-    @Test
-    public void testResolveWithOneEmpty()
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
-                                                                                                       .add("c2", "v2")
-                                                                                                       .buildUpdate())));
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm)));
-
-        try(PartitionIterator data = resolver.resolve())
-        {
-            try (RowIterator rows = Iterators.getOnlyElement(data))
-            {
-                Row row = Iterators.getOnlyElement(rows);
-                assertColumns(row, "c2");
-                assertColumn(cfm, row, "c2", "v2", 1);
-            }
-            assertRepairFuture(resolver, 1);
-        }
-
-        assertEquals(1, messageRecorder.sent.size());
-        // peer 2 needs the row from peer 1
-        MessageOut msg = getSentMessage(peer2);
-        assertRepairMetadata(msg);
-        assertRepairContainsNoDeletions(msg);
-        assertRepairContainsColumn(msg, "1", "c2", "v2", 1);
-    }
-
-    @Test
-    public void testResolveWithBothEmpty()
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
-        resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm)));
-        resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm)));
-
-        try(PartitionIterator data = resolver.resolve())
-        {
-            assertFalse(data.hasNext());
-            assertRepairFuture(resolver, 0);
-        }
-
-        assertTrue(messageRecorder.sent.isEmpty());
-    }
-
-    @Test
-    public void testResolveDeleted()
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
-        // one response with columns timestamped before a delete in another response
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
-                                                                                                       .add("one", "A")
-                                                                                                       .buildUpdate())));
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, fullPartitionDelete(cfm, dk, 1, nowInSec)));
-
-        try (PartitionIterator data = resolver.resolve())
-        {
-            assertFalse(data.hasNext());
-            assertRepairFuture(resolver, 1);
-        }
-
-        // peer1 should get the deletion from peer2
-        assertEquals(1, messageRecorder.sent.size());
-        MessageOut msg = getSentMessage(peer1);
-        assertRepairMetadata(msg);
-        assertRepairContainsDeletions(msg, new DeletionTime(1, nowInSec));
-        assertRepairContainsNoColumns(msg);
-    }
-
-    @Test
-    public void testResolveMultipleDeleted()
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime());
-        // deletes and columns with interleaved timestamp, with out of order return sequence
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
-        // these columns created after the previous deletion
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
-                                                                                                       .add("one", "A")
-                                                                                                       .add("two", "A")
-                                                                                                       .buildUpdate())));
-        //this column created after the next delete
-        InetAddressAndPort peer3 = peer();
-        resolver.preprocess(readResponseMessage(peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1")
-                                                                                                       .add("two", "B")
-                                                                                                       .buildUpdate())));
-        InetAddressAndPort peer4 = peer();
-        resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
-
-        try(PartitionIterator data = resolver.resolve())
-        {
-            try (RowIterator rows = Iterators.getOnlyElement(data))
-            {
-                Row row = Iterators.getOnlyElement(rows);
-                assertColumns(row, "two");
-                assertColumn(cfm, row, "two", "B", 3);
-            }
-            assertRepairFuture(resolver, 4);
-        }
-
-        // peer 1 needs to get the partition delete from peer 4 and the row from peer 3
-        assertEquals(4, messageRecorder.sent.size());
-        MessageOut msg = getSentMessage(peer1);
-        assertRepairMetadata(msg);
-        assertRepairContainsDeletions(msg, new DeletionTime(2, nowInSec));
-        assertRepairContainsColumn(msg, "1", "two", "B", 3);
-
-        // peer 2 needs the deletion from peer 4 and the row from peer 3
-        msg = getSentMessage(peer2);
-        assertRepairMetadata(msg);
-        assertRepairContainsDeletions(msg, new DeletionTime(2, nowInSec));
-        assertRepairContainsColumn(msg, "1", "two", "B", 3);
-
-        // peer 3 needs just the deletion from peer 4
-        msg = getSentMessage(peer3);
-        assertRepairMetadata(msg);
-        assertRepairContainsDeletions(msg, new DeletionTime(2, nowInSec));
-        assertRepairContainsNoColumns(msg);
-
-        // peer 4 needs just the row from peer 3
-        msg = getSentMessage(peer4);
-        assertRepairMetadata(msg);
-        assertRepairContainsNoDeletions(msg);
-        assertRepairContainsColumn(msg, "1", "two", "B", 3);
-    }
-
-    @Test
-    public void testResolveRangeTombstonesOnBoundaryRightWins() throws UnknownHostException
-    {
-        resolveRangeTombstonesOnBoundary(1, 2);
-    }
-
-    @Test
-    public void testResolveRangeTombstonesOnBoundaryLeftWins() throws UnknownHostException
-    {
-        resolveRangeTombstonesOnBoundary(2, 1);
-    }
-
-    @Test
-    public void testResolveRangeTombstonesOnBoundarySameTimestamp() throws UnknownHostException
-    {
-        resolveRangeTombstonesOnBoundary(1, 1);
-    }
-
-    /*
-     * We want responses to merge on tombstone boundary. So we'll merge 2 "streams":
-     *   1: [1, 2)(3, 4](5, 6]  2
-     *   2:    [2, 3][4, 5)     1
-     * which tests all combination of open/close boundaries (open/close, close/open, open/open, close/close).
-     *
-     * Note that, because DataResolver returns a "filtered" iterator, it should resolve into an empty iterator.
-     * However, what should be sent to each source depends on the exact on the timestamps of each tombstones and we
-     * test a few combination.
-     */
-    private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
-        InetAddressAndPort peer1 = peer();
-        InetAddressAndPort peer2 = peer();
-
-        // 1st "stream"
-        RangeTombstone one_two    = tombstone("1", true , "2", false, timestamp1, nowInSec);
-        RangeTombstone three_four = tombstone("3", false, "4", true , timestamp1, nowInSec);
-        RangeTombstone five_six   = tombstone("5", false, "6", true , timestamp1, nowInSec);
-        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(one_two)
-                                                                                            .addRangeTombstone(three_four)
-                                                                                            .addRangeTombstone(five_six)
-                                                                                            .buildUpdate());
-
-        // 2nd "stream"
-        RangeTombstone two_three = tombstone("2", true, "3", true , timestamp2, nowInSec);
-        RangeTombstone four_five = tombstone("4", true, "5", false, timestamp2, nowInSec);
-        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(two_three)
-                                                                                            .addRangeTombstone(four_five)
-                                                                                            .buildUpdate());
-
-        resolver.preprocess(readResponseMessage(peer1, iter1));
-        resolver.preprocess(readResponseMessage(peer2, iter2));
-
-        // No results, we've only reconciled tombstones.
-        try (PartitionIterator data = resolver.resolve())
-        {
-            assertFalse(data.hasNext());
-            assertRepairFuture(resolver, 2);
-        }
-
-        assertEquals(2, messageRecorder.sent.size());
-
-        MessageOut msg1 = getSentMessage(peer1);
-        assertRepairMetadata(msg1);
-        assertRepairContainsNoColumns(msg1);
-
-        MessageOut msg2 = getSentMessage(peer2);
-        assertRepairMetadata(msg2);
-        assertRepairContainsNoColumns(msg2);
-
-        // Both streams are mostly complementary, so they will roughly get the ranges of the other stream. One subtlety is
-        // around the value "4" however, as it's included by both stream.
-        // So for a given stream, unless the other stream has a strictly higher timestamp, the value 4 will be excluded
-        // from whatever range it receives as repair since the stream already covers it.
-
-        // Message to peer1 contains peer2 ranges
-        assertRepairContainsDeletions(msg1, null, two_three, withExclusiveStartIf(four_five, timestamp1 >= timestamp2));
-
-        // Message to peer2 contains peer1 ranges
-        assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
-    }
-
-    /**
-     * Test cases where a boundary of a source is covered by another source deletion and timestamp on one or both side
-     * of the boundary are equal to the "merged" deletion.
-     * This is a test for CASSANDRA-13237 to make sure we handle this case properly.
-     */
-    @Test
-    public void testRepairRangeTombstoneBoundary() throws UnknownHostException
-    {
-        testRepairRangeTombstoneBoundary(1, 0, 1);
-        messageRecorder.sent.clear();
-        testRepairRangeTombstoneBoundary(1, 1, 0);
-        messageRecorder.sent.clear();
-        testRepairRangeTombstoneBoundary(1, 1, 1);
-    }
-
-    /**
-     * Test for CASSANDRA-13237, checking we don't fail (and handle correctly) the case where a RT boundary has the
-     * same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could
-     * thus still be sent).
-     */
-    private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
-        InetAddressAndPort peer1 = peer();
-        InetAddressAndPort peer2 = peer();
-
-        // 1st "stream"
-        RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec);
-        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
-                                                 .addRangeTombstone(one_nine)
-                                                 .buildUpdate());
-
-        // 2nd "stream" (build more manually to ensure we have the boundary we want)
-        RangeTombstoneBoundMarker open_one = marker("0", true, true, timestamp2, nowInSec);
-        RangeTombstoneBoundaryMarker boundary_five = boundary("5", false, timestamp2, nowInSec, timestamp3, nowInSec);
-        RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec);
-        UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine);
-
-        resolver.preprocess(readResponseMessage(peer1, iter1));
-        resolver.preprocess(readResponseMessage(peer2, iter2));
-
-        boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3;
-
-        // No results, we've only reconciled tombstones.
-        try (PartitionIterator data = resolver.resolve())
-        {
-            assertFalse(data.hasNext());
-            assertRepairFuture(resolver, shouldHaveRepair ? 1 : 0);
-        }
-
-        assertEquals(shouldHaveRepair? 1 : 0, messageRecorder.sent.size());
-
-        if (!shouldHaveRepair)
-            return;
-
-        MessageOut msg = getSentMessage(peer2);
-        assertRepairMetadata(msg);
-        assertRepairContainsNoColumns(msg);
-
-        RangeTombstone expected = timestamp1 != timestamp2
-                                  // We've repaired the 1st part
-                                  ? tombstone("0", true, "5", false, timestamp1, nowInSec)
-                                  // We've repaired the 2nd part
-                                  : tombstone("5", true, "9", true, timestamp1, nowInSec);
-        assertRepairContainsDeletions(msg, null, expected);
-    }
-
-    /**
-     * Test for CASSANDRA-13719: tests that having a partition deletion shadow a range tombstone on another source
-     * doesn't trigger an assertion error.
-     */
-    @Test
-    public void testRepairRangeTombstoneWithPartitionDeletion()
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
-        InetAddressAndPort peer1 = peer();
-        InetAddressAndPort peer2 = peer();
-
-        // 1st "stream": just a partition deletion
-        UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec));
-
-        // 2nd "stream": a range tombstone that is covered by the 1st stream
-        RangeTombstone rt = tombstone("0", true , "10", true, 5, nowInSec);
-        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
-                                                 .addRangeTombstone(rt)
-                                                 .buildUpdate());
-
-        resolver.preprocess(readResponseMessage(peer1, iter1));
-        resolver.preprocess(readResponseMessage(peer2, iter2));
-
-        // No results, we've only reconciled tombstones.
-        try (PartitionIterator data = resolver.resolve())
-        {
-            assertFalse(data.hasNext());
-            // 2nd stream should get repaired
-            assertRepairFuture(resolver, 1);
-        }
-
-        assertEquals(1, messageRecorder.sent.size());
-
-        MessageOut msg = getSentMessage(peer2);
-        assertRepairMetadata(msg);
-        assertRepairContainsNoColumns(msg);
-
-        assertRepairContainsDeletions(msg, new DeletionTime(10, nowInSec));
-    }
-
-    /**
-     * Additional test for CASSANDRA-13719: tests the case where a partition deletion doesn't shadow a range tombstone.
-     */
-    @Test
-    public void testRepairRangeTombstoneWithPartitionDeletion2()
-    {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
-        InetAddressAndPort peer1 = peer();
-        InetAddressAndPort peer2 = peer();
-
-        // 1st "stream": a partition deletion and a range tombstone
-        RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec);
-        PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
-                                                 .addRangeTombstone(rt1)
-                                                 .buildUpdate();
-        ((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec));
-        UnfilteredPartitionIterator iter1 = iter(upd1);
-
-        // 2nd "stream": a range tombstone that is covered by the other stream rt
-        RangeTombstone rt2 = tombstone("2", true , "3", true, 11, nowInSec);
-        RangeTombstone rt3 = tombstone("4", true , "5", true, 10, nowInSec);
-        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
-                                                 .addRangeTombstone(rt2)
-                                                 .addRangeTombstone(rt3)
-                                                 .buildUpdate());
-
-        resolver.preprocess(readResponseMessage(peer1, iter1));
-        resolver.preprocess(readResponseMessage(peer2, iter2));
-
-        // No results, we've only reconciled tombstones.
-        try (PartitionIterator data = resolver.resolve())
-        {
-            assertFalse(data.hasNext());
-            // 2nd stream should get repaired
-            assertRepairFuture(resolver, 1);
-        }
-
-        assertEquals(1, messageRecorder.sent.size());
-
-        MessageOut msg = getSentMessage(peer2);
-        assertRepairMetadata(msg);
-        assertRepairContainsNoColumns(msg);
-
-        // 2nd stream should get both the partition deletion, as well as the part of the 1st stream RT that it misses
-        assertRepairContainsDeletions(msg, new DeletionTime(10, nowInSec),
-                                      tombstone("0", true, "2", false, 11, nowInSec),
-                                      tombstone("3", false, "9", true, 11, nowInSec));
-    }
-
-    // Forces the start to be exclusive if the condition holds
-    private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
-    {
-        if (!condition)
-            return rt;
-
-        Slice slice = rt.deletedSlice();
-        ClusteringBound newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start().getRawValues());
-        return condition
-             ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
-             : rt;
-    }
-
-    // Forces the end to be exclusive if the condition holds
-    private static RangeTombstone withExclusiveEndIf(RangeTombstone rt, boolean condition)
-    {
-        if (!condition)
-            return rt;
-
-        Slice slice = rt.deletedSlice();
-        ClusteringBound newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end().getRawValues());
-        return condition
-             ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
-             : rt;
-    }
-
-    private static ByteBuffer bb(int b)
-    {
-        return ByteBufferUtil.bytes(b);
-    }
-
-    private Cell mapCell(int k, int v, long ts)
-    {
-        return BufferCell.live(m, ts, bb(v), CellPath.create(bb(k)));
-    }
-
-    @Test
-    public void testResolveComplexDelete()
-    {
-        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
-
-        long[] ts = {100, 200};
-
-        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
-        builder.newRow(Clustering.EMPTY);
-        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
-        builder.addCell(mapCell(0, 0, ts[0]));
-
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
-        builder.newRow(Clustering.EMPTY);
-        DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
-        builder.addComplexDeletion(m, expectedCmplxDelete);
-        Cell expectedCell = mapCell(1, 1, ts[1]);
-        builder.addCell(expectedCell);
-
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
-        try(PartitionIterator data = resolver.resolve())
-        {
-            try (RowIterator rows = Iterators.getOnlyElement(data))
-            {
-                Row row = Iterators.getOnlyElement(rows);
-                assertColumns(row, "m");
-                Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
-                Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
-            }
-            assertRepairFuture(resolver, 1);
-        }
-
-        MessageOut<Mutation> msg;
-        msg = getSentMessage(peer1);
-        Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2).iterator();
-        assertTrue(rowIter.hasNext());
-        Row row = rowIter.next();
-        assertFalse(rowIter.hasNext());
-
-        ComplexColumnData cd = row.getComplexColumnData(m);
-
-        assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
-        assertEquals(expectedCmplxDelete, cd.complexDeletion());
-
-        Assert.assertNull(messageRecorder.sent.get(peer2));
-    }
-
-    @Test
-    public void testResolveDeletedCollection()
-    {
-
-        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
-
-        long[] ts = {100, 200};
-
-        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
-        builder.newRow(Clustering.EMPTY);
-        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
-        builder.addCell(mapCell(0, 0, ts[0]));
-
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
-        builder.newRow(Clustering.EMPTY);
-        DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
-        builder.addComplexDeletion(m, expectedCmplxDelete);
-
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
-        try(PartitionIterator data = resolver.resolve())
-        {
-            assertFalse(data.hasNext());
-            assertRepairFuture(resolver, 1);
-        }
-
-        MessageOut<Mutation> msg;
-        msg = getSentMessage(peer1);
-        Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2).iterator();
-        assertTrue(rowIter.hasNext());
-        Row row = rowIter.next();
-        assertFalse(rowIter.hasNext());
-
-        ComplexColumnData cd = row.getComplexColumnData(m);
-
-        assertEquals(Collections.emptySet(), Sets.newHashSet(cd));
-        assertEquals(expectedCmplxDelete, cd.complexDeletion());
-
-        Assert.assertNull(messageRecorder.sent.get(peer2));
-    }
-
-    @Test
-    public void testResolveNewCollection()
-    {
-        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
-
-        long[] ts = {100, 200};
-
-        // map column
-        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
-        builder.newRow(Clustering.EMPTY);
-        DeletionTime expectedCmplxDelete = new DeletionTime(ts[0] - 1, nowInSec);
-        builder.addComplexDeletion(m, expectedCmplxDelete);
-        Cell expectedCell = mapCell(0, 0, ts[0]);
-        builder.addCell(expectedCell);
-
-        // empty map column
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
-
-        try(PartitionIterator data = resolver.resolve())
-        {
-            try (RowIterator rows = Iterators.getOnlyElement(data))
-            {
-                Row row = Iterators.getOnlyElement(rows);
-                assertColumns(row, "m");
-                ComplexColumnData cd = row.getComplexColumnData(m);
-                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
-            }
-            assertRepairFuture(resolver, 1);
-        }
-
-        Assert.assertNull(messageRecorder.sent.get(peer1));
-
-        MessageOut<Mutation> msg;
-        msg = getSentMessage(peer2);
-        Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2).iterator();
-        assertTrue(rowIter.hasNext());
-        Row row = rowIter.next();
-        assertFalse(rowIter.hasNext());
-
-        ComplexColumnData cd = row.getComplexColumnData(m);
-
-        assertEquals(Sets.newHashSet(expectedCell), Sets.newHashSet(cd));
-        assertEquals(expectedCmplxDelete, cd.complexDeletion());
-    }
-
-    @Test
-    public void testResolveNewCollectionOverwritingDeleted()
-    {
-        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
-
-        long[] ts = {100, 200};
-
-        // cleared map column
-        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
-        builder.newRow(Clustering.EMPTY);
-        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
-
-        InetAddressAndPort peer1 = peer();
-        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
-        // newer, overwritten map column
-        builder.newRow(Clustering.EMPTY);
-        DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
-        builder.addComplexDeletion(m, expectedCmplxDelete);
-        Cell expectedCell = mapCell(1, 1, ts[1]);
-        builder.addCell(expectedCell);
-
-        InetAddressAndPort peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
-        try(PartitionIterator data = resolver.resolve())
-        {
-            try (RowIterator rows = Iterators.getOnlyElement(data))
-            {
-                Row row = Iterators.getOnlyElement(rows);
-                assertColumns(row, "m");
-                ComplexColumnData cd = row.getComplexColumnData(m);
-                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
-            }
-            assertRepairFuture(resolver, 1);
-        }
-
-        MessageOut<Mutation> msg;
-        msg = getSentMessage(peer1);
-        Row row = Iterators.getOnlyElement(msg.payload.getPartitionUpdate(cfm2).iterator());
-
-        ComplexColumnData cd = row.getComplexColumnData(m);
-
-        assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
-        assertEquals(expectedCmplxDelete, cd.complexDeletion());
-
-        Assert.assertNull(messageRecorder.sent.get(peer2));
-    }
-
-    private InetAddressAndPort peer()
-    {
-        try
-        {
-            return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
-        }
-        catch (UnknownHostException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private MessageOut<Mutation> getSentMessage(InetAddressAndPort target)
-    {
-        MessageOut<Mutation> message = messageRecorder.sent.get(target);
-        assertNotNull(String.format("No repair message was sent to %s", target), message);
-        return message;
-    }
-
-    private void assertRepairContainsDeletions(MessageOut<Mutation> message,
-                                               DeletionTime deletionTime,
-                                               RangeTombstone...rangeTombstones)
-    {
-        PartitionUpdate update = ((Mutation)message.payload).getPartitionUpdates().iterator().next();
-        DeletionInfo deletionInfo = update.deletionInfo();
-        if (deletionTime != null)
-            assertEquals(deletionTime, deletionInfo.getPartitionDeletion());
-
-        assertEquals(rangeTombstones.length, deletionInfo.rangeCount());
-        Iterator<RangeTombstone> ranges = deletionInfo.rangeIterator(false);
-        int i = 0;
-        while (ranges.hasNext())
-        {
-            RangeTombstone expected = rangeTombstones[i++];
-            RangeTombstone actual = ranges.next();
-            String msg = String.format("Expected %s, but got %s", expected.toString(cfm.comparator), actual.toString(cfm.comparator));
-            assertEquals(msg, expected, actual);
-        }
-    }
-
-    private void assertRepairContainsNoDeletions(MessageOut<Mutation> message)
-    {
-        PartitionUpdate update = ((Mutation)message.payload).getPartitionUpdates().iterator().next();
-        assertTrue(update.deletionInfo().isLive());
-    }
-
-    private void assertRepairContainsColumn(MessageOut<Mutation> message,
-                                            String clustering,
-                                            String columnName,
-                                            String value,
-                                            long timestamp)
-    {
-        PartitionUpdate update = ((Mutation)message.payload).getPartitionUpdates().iterator().next();
-        Row row = update.getRow(update.metadata().comparator.make(clustering));
-        assertNotNull(row);
-        assertColumn(cfm, row, columnName, value, timestamp);
-    }
-
-    private void assertRepairContainsNoColumns(MessageOut<Mutation> message)
-    {
-        PartitionUpdate update = ((Mutation)message.payload).getPartitionUpdates().iterator().next();
-        assertFalse(update.iterator().hasNext());
-    }
-
-    private void assertRepairMetadata(MessageOut<Mutation> message)
-    {
-        assertEquals(MessagingService.Verb.READ_REPAIR, message.verb);
-        PartitionUpdate update = ((Mutation)message.payload).getPartitionUpdates().iterator().next();
-        assertEquals(update.metadata().keyspace, cfm.keyspace);
-        assertEquals(update.metadata().name, cfm.name);
-    }
-
-
-    public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator)
-    {
-        return readResponseMessage(from, partitionIterator, command);
-
-    }
-    public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd)
-    {
-        return MessageIn.create(from,
-                                ReadResponse.createRemoteDataResponse(partitionIterator, cmd),
-                                Collections.EMPTY_MAP,
-                                MessagingService.Verb.REQUEST_RESPONSE,
-                                MessagingService.current_version);
-    }
-
-    private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
-    {
-        return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
-    }
-
-    private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
-    {
-        ClusteringBound startBound = rtBound(start, true, inclusiveStart);
-        ClusteringBound endBound = rtBound(end, false, inclusiveEnd);
-        return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
-    }
-
-    private ClusteringBound rtBound(Object value, boolean isStart, boolean inclusive)
-    {
-        ClusteringBound.Kind kind = isStart
-                                  ? (inclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND)
-                                  : (inclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND);
-
-        return ClusteringBound.create(kind, cfm.comparator.make(value).getRawValues());
-    }
-
-    private ClusteringBoundary rtBoundary(Object value, boolean inclusiveOnEnd)
-    {
-        ClusteringBound.Kind kind = inclusiveOnEnd
-                                  ? Kind.INCL_END_EXCL_START_BOUNDARY
-                                  : Kind.EXCL_END_INCL_START_BOUNDARY;
-        return ClusteringBoundary.create(kind, cfm.comparator.make(value).getRawValues());
-    }
-
-    private RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime)
-    {
-        return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime));
-    }
-
-    private RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2)
-    {
-        return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd),
-                                                new DeletionTime(markedForDeleteAt1, localDeletionTime1),
-                                                new DeletionTime(markedForDeleteAt2, localDeletionTime2));
-    }
-
-    private UnfilteredPartitionIterator fullPartitionDelete(TableMetadata table, DecoratedKey dk, long timestamp, int nowInSec)
-    {
-        return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(table, dk, timestamp, nowInSec).unfilteredIterator());
-    }
-
-    private static class MessageRecorder implements IMessageSink
-    {
-        Map<InetAddressAndPort, MessageOut> sent = new HashMap<>();
-        public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
-        {
-            sent.put(to, message);
-            return false;
-        }
-
-        public boolean allowIncomingMessage(MessageIn message, int id)
-        {
-            return false;
-        }
-    }
-
-    private UnfilteredPartitionIterator iter(PartitionUpdate update)
-    {
-        return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator());
-    }
-
-    private UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds)
-    {
-        SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator);
-        Collections.addAll(s, unfiltereds);
-        final Iterator<Unfiltered> iterator = s.iterator();
-
-        UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm,
-                                                                          key,
-                                                                          DeletionTime.LIVE,
-                                                                          cfm.regularAndStaticColumns(),
-                                                                          Rows.EMPTY_STATIC_ROW,
-                                                                          false,
-                                                                          EncodingStats.NO_STATS)
-        {
-            protected Unfiltered computeNext()
-            {
-                return iterator.hasNext() ? iterator.next() : endOfData();
-            }
-        };
-        return new SingletonUnfilteredPartitionIterator(rowIter);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
deleted file mode 100644
index f21e241..0000000
--- a/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableList;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.exceptions.ReadFailureException;
-import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.KeyspaceParams;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ReadExecutorTest
-{
-    static Keyspace ks;
-    static ColumnFamilyStore cfs;
-    static List<InetAddressAndPort> targets;
-
-    @BeforeClass
-    public static void setUpClass() throws Throwable
-    {
-        SchemaLoader.loadSchema();
-        SchemaLoader.createKeyspace("Foo", KeyspaceParams.simple(3), SchemaLoader.standardCFMD("Foo", "Bar"));
-        ks = Keyspace.open("Foo");
-        cfs = ks.getColumnFamilyStore("Bar");
-        targets = ImmutableList.of(InetAddressAndPort.getByName("127.0.0.255"), InetAddressAndPort.getByName("127.0.0.254"), InetAddressAndPort.getByName("127.0.0.253"));
-        cfs.sampleLatencyNanos = 0;
-    }
-
-    @Before
-    public void resetCounters() throws Throwable
-    {
-        cfs.metric.speculativeInsufficientReplicas.dec(cfs.metric.speculativeInsufficientReplicas.getCount());
-        cfs.metric.speculativeRetries.dec(cfs.metric.speculativeRetries.getCount());
-        cfs.metric.speculativeFailedRetries.dec(cfs.metric.speculativeFailedRetries.getCount());
-    }
-
-    /**
-     * If speculation would have been beneficial but could not be attempted due to lack of replicas
-     * count that it occured
-     */
-    @Test
-    public void testUnableToSpeculate() throws Throwable
-    {
-        assertEquals(0, cfs.metric.speculativeInsufficientReplicas.getCount());
-        assertEquals(0, ks.metric.speculativeInsufficientReplicas.getCount());
-        AbstractReadExecutor executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), true);
-        executor.maybeTryAdditionalReplicas();
-        try
-        {
-            executor.get();
-            fail();
-        }
-        catch (ReadTimeoutException e)
-        {
-            //expected
-        }
-        assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
-        assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
-
-        //Shouldn't increment
-        executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), false);
-        executor.maybeTryAdditionalReplicas();
-        try
-        {
-            executor.get();
-            fail();
-        }
-        catch (ReadTimeoutException e)
-        {
-            //expected
-        }
-        assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
-        assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
-    }
-
-    /**
-     *  Test that speculation when it is attempted is countedc, and when it succeed
-     *  no failure is counted.
-     */
-    @Test
-    public void testSpeculateSucceeded() throws Throwable
-    {
-        assertEquals(0, cfs.metric.speculativeRetries.getCount());
-        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
-        assertEquals(0, ks.metric.speculativeRetries.getCount());
-        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
-        AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
-        executor.maybeTryAdditionalReplicas();
-        new Thread()
-        {
-            @Override
-            public void run()
-            {
-                //Failures end the read promptly but don't require mock data to be suppleid
-                executor.handler.onFailure(targets.get(0), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
-                executor.handler.onFailure(targets.get(1), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
-                executor.handler.condition.signalAll();
-            }
-        }.start();
-
-        try
-        {
-            executor.get();
-            fail();
-        }
-        catch (ReadFailureException e)
-        {
-            //expected
-        }
-        assertEquals(1, cfs.metric.speculativeRetries.getCount());
-        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
-        assertEquals(1, ks.metric.speculativeRetries.getCount());
-        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
-
-    }
-
-    /**
-     * Test that speculation failure statistics are incremented if speculation occurs
-     * and the read still times out.
-     */
-    @Test
-    public void testSpeculateFailed() throws Throwable
-    {
-        assertEquals(0, cfs.metric.speculativeRetries.getCount());
-        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
-        assertEquals(0, ks.metric.speculativeRetries.getCount());
-        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
-        AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
-        executor.maybeTryAdditionalReplicas();
-        try
-        {
-            executor.get();
-            fail();
-        }
-        catch (ReadTimeoutException e)
-        {
-            //expected
-        }
-        assertEquals(1, cfs.metric.speculativeRetries.getCount());
-        assertEquals(1, cfs.metric.speculativeFailedRetries.getCount());
-        assertEquals(1, ks.metric.speculativeRetries.getCount());
-        assertEquals(1, ks.metric.speculativeFailedRetries.getCount());
-    }
-
-    public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand
-    {
-        private final long timeout;
-
-        MockSinglePartitionReadCommand()
-        {
-            this(0);
-        }
-
-        MockSinglePartitionReadCommand(long timeout)
-        {
-            super(false, 0, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null);
-            this.timeout = timeout;
-        }
-
-        @Override
-        public long getTimeout()
-        {
-            return timeout;
-        }
-
-        @Override
-        public MessageOut createMessage()
-        {
-            return new MessageOut(MessagingService.Verb.BATCH_REMOVE)
-            {
-                @Override
-                public int serializedSize(int version)
-                {
-                    return 0;
-                }
-            };
-        }
-
-    }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org