You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2018/03/02 01:55:23 UTC
[1/5] cassandra git commit: Refactor read executor and response
resolver, abstract read repair
Repository: cassandra
Updated Branches:
refs/heads/trunk 4a50f4467 -> 39807ba48
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
new file mode 100644
index 0000000..4d4d398
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -0,0 +1,1057 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads;
+
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.*;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.repair.BlockingReadRepair;
+import org.apache.cassandra.service.reads.repair.NoopReadRepair;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.service.reads.repair.RepairListener;
+import org.apache.cassandra.service.reads.repair.TestableReadRepair;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.assertClustering;
+import static org.apache.cassandra.Util.assertColumn;
+import static org.apache.cassandra.Util.assertColumns;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.apache.cassandra.db.ClusteringBound.Kind;
+
+public class DataResolverTest
+{
+ public static final String KEYSPACE1 = "DataResolverTest";
+ public static final String CF_STANDARD = "Standard1";
+ public static final String CF_COLLECTION = "Collection1";
+
+ // counter to generate the last byte of the respondent's address in a ReadResponse message
+ private int addressSuffix = 10;
+
+ private DecoratedKey dk;
+ private Keyspace ks;
+ private ColumnFamilyStore cfs;
+ private ColumnFamilyStore cfs2;
+ private TableMetadata cfm;
+ private TableMetadata cfm2;
+ private ColumnMetadata m;
+ private int nowInSec;
+ private ReadCommand command;
+ private TestableReadRepair readRepair;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ DatabaseDescriptor.daemonInitialization();
+
+ TableMetadata.Builder builder1 =
+ TableMetadata.builder(KEYSPACE1, CF_STANDARD)
+ .addPartitionKeyColumn("key", BytesType.instance)
+ .addClusteringColumn("col1", AsciiType.instance)
+ .addRegularColumn("c1", AsciiType.instance)
+ .addRegularColumn("c2", AsciiType.instance)
+ .addRegularColumn("one", AsciiType.instance)
+ .addRegularColumn("two", AsciiType.instance);
+
+ TableMetadata.Builder builder2 =
+ TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
+ .addPartitionKeyColumn("k", ByteType.instance)
+ .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2);
+ }
+
+ @Before
+ public void setup()
+ {
+ dk = Util.dk("key1");
+ ks = Keyspace.open(KEYSPACE1);
+ cfs = ks.getColumnFamilyStore(CF_STANDARD);
+ cfm = cfs.metadata();
+ cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
+ cfm2 = cfs2.metadata();
+ m = cfm2.getColumn(new ColumnIdentifier("m", false));
+
+ nowInSec = FBUtilities.nowInSeconds();
+ command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
+ readRepair = new TestableReadRepair(command);
+ }
+
+ @Test
+ public void testResolveNewerSingleRow() throws UnknownHostException
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ InetAddressAndPort peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+ .add("c1", "v1")
+ .buildUpdate())));
+ InetAddressAndPort peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+ .add("c1", "v2")
+ .buildUpdate())));
+
+ try(PartitionIterator data = resolver.resolve())
+ {
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c1");
+ assertColumn(cfm, row, "c1", "v2", 1);
+ }
+ }
+
+ assertEquals(1, readRepair.sent.size());
+ // peer 1 just needs to repair with the row from peer 2
+ Mutation mutation = readRepair.getForEndpoint(peer1);
+ assertRepairMetadata(mutation);
+ assertRepairContainsNoDeletions(mutation);
+ assertRepairContainsColumn(mutation, "1", "c1", "v2", 1);
+ }
+
+ @Test
+ public void testResolveDisjointSingleRow()
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ InetAddressAndPort peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+ .add("c1", "v1")
+ .buildUpdate())));
+
+ InetAddressAndPort peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+ .add("c2", "v2")
+ .buildUpdate())));
+
+ try(PartitionIterator data = resolver.resolve())
+ {
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c1", "c2");
+ assertColumn(cfm, row, "c1", "v1", 0);
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ }
+
+ assertEquals(2, readRepair.sent.size());
+ // each peer needs to repair with each other's column
+ Mutation mutation = readRepair.getForEndpoint(peer1);
+ assertRepairMetadata(mutation);
+ assertRepairContainsColumn(mutation, "1", "c2", "v2", 1);
+
+ mutation = readRepair.getForEndpoint(peer2);
+ assertRepairMetadata(mutation);
+ assertRepairContainsColumn(mutation, "1", "c1", "v1", 0);
+ }
+
+ @Test
+ public void testResolveDisjointMultipleRows() throws UnknownHostException
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ InetAddressAndPort peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+ .add("c1", "v1")
+ .buildUpdate())));
+ InetAddressAndPort peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2")
+ .add("c2", "v2")
+ .buildUpdate())));
+
+ try (PartitionIterator data = resolver.resolve())
+ {
+ try (RowIterator rows = data.next())
+ {
+ // We expect the resolved superset to contain both rows
+ Row row = rows.next();
+ assertClustering(cfm, row, "1");
+ assertColumns(row, "c1");
+ assertColumn(cfm, row, "c1", "v1", 0);
+
+ row = rows.next();
+ assertClustering(cfm, row, "2");
+ assertColumns(row, "c2");
+ assertColumn(cfm, row, "c2", "v2", 1);
+
+ assertFalse(rows.hasNext());
+ assertFalse(data.hasNext());
+ }
+ }
+
+ assertEquals(2, readRepair.sent.size());
+ // each peer needs to repair the row from the other
+ Mutation mutation = readRepair.getForEndpoint(peer1);
+ assertRepairMetadata(mutation);
+ assertRepairContainsNoDeletions(mutation);
+ assertRepairContainsColumn(mutation, "2", "c2", "v2", 1);
+
+ mutation = readRepair.getForEndpoint(peer2);
+ assertRepairMetadata(mutation);
+ assertRepairContainsNoDeletions(mutation);
+ assertRepairContainsColumn(mutation, "1", "c1", "v1", 0);
+ }
+
+ @Test
+ public void testResolveDisjointMultipleRowsWithRangeTombstones()
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime(), readRepair);
+
+ RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
+ RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
+ PartitionUpdate update = new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
+ .addRangeTombstone(tombstone2)
+ .buildUpdate();
+
+ InetAddressAndPort peer1 = peer();
+ UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
+ .addRangeTombstone(tombstone2)
+ .buildUpdate());
+ resolver.preprocess(readResponseMessage(peer1, iter1));
+ // not covered by any range tombstone
+ InetAddressAndPort peer2 = peer();
+ UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("0")
+ .add("c1", "v0")
+ .buildUpdate());
+ resolver.preprocess(readResponseMessage(peer2, iter2));
+ // covered by a range tombstone
+ InetAddressAndPort peer3 = peer();
+ UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("10")
+ .add("c2", "v1")
+ .buildUpdate());
+ resolver.preprocess(readResponseMessage(peer3, iter3));
+ // range covered by rt, but newer
+ InetAddressAndPort peer4 = peer();
+ UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm, nowInSec, 2L, dk).clustering("3")
+ .add("one", "A")
+ .buildUpdate());
+ resolver.preprocess(readResponseMessage(peer4, iter4));
+ try (PartitionIterator data = resolver.resolve())
+ {
+ try (RowIterator rows = data.next())
+ {
+ Row row = rows.next();
+ assertClustering(cfm, row, "0");
+ assertColumns(row, "c1");
+ assertColumn(cfm, row, "c1", "v0", 0);
+
+ row = rows.next();
+ assertClustering(cfm, row, "3");
+ assertColumns(row, "one");
+ assertColumn(cfm, row, "one", "A", 2);
+
+ assertFalse(rows.hasNext());
+ }
+ }
+
+ assertEquals(4, readRepair.sent.size());
+ // peer1 needs the rows from peers 2 and 4
+ Mutation mutation = readRepair.getForEndpoint(peer1);
+ assertRepairMetadata(mutation);
+ assertRepairContainsNoDeletions(mutation);
+ assertRepairContainsColumn(mutation, "0", "c1", "v0", 0);
+ assertRepairContainsColumn(mutation, "3", "one", "A", 2);
+
+ // peer2 needs to get the row from peer4 and the RTs
+ mutation = readRepair.getForEndpoint(peer2);
+ assertRepairMetadata(mutation);
+ assertRepairContainsDeletions(mutation, null, tombstone1, tombstone2);
+ assertRepairContainsColumn(mutation, "3", "one", "A", 2);
+
+ // peer 3 needs both rows and the RTs
+ mutation = readRepair.getForEndpoint(peer3);
+ assertRepairMetadata(mutation);
+ assertRepairContainsDeletions(mutation, null, tombstone1, tombstone2);
+ assertRepairContainsColumn(mutation, "0", "c1", "v0", 0);
+ assertRepairContainsColumn(mutation, "3", "one", "A", 2);
+
+ // peer4 needs the row from peer2 and the RTs
+ mutation = readRepair.getForEndpoint(peer4);
+ assertRepairMetadata(mutation);
+ assertRepairContainsDeletions(mutation, null, tombstone1, tombstone2);
+ assertRepairContainsColumn(mutation, "0", "c1", "v0", 0);
+ }
+
+ @Test
+ public void testResolveWithOneEmpty()
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ InetAddressAndPort peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+ .add("c2", "v2")
+ .buildUpdate())));
+ InetAddressAndPort peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm)));
+
+ try(PartitionIterator data = resolver.resolve())
+ {
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "c2");
+ assertColumn(cfm, row, "c2", "v2", 1);
+ }
+ }
+
+ assertEquals(1, readRepair.sent.size());
+ // peer 2 needs the row from peer 1
+ Mutation mutation = readRepair.getForEndpoint(peer2);
+ assertRepairMetadata(mutation);
+ assertRepairContainsNoDeletions(mutation);
+ assertRepairContainsColumn(mutation, "1", "c2", "v2", 1);
+ }
+
+ @Test
+ public void testResolveWithBothEmpty()
+ {
+ TestableReadRepair readRepair = new TestableReadRepair(command);
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm)));
+ resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm)));
+
+ try(PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ }
+
+ assertTrue(readRepair.sent.isEmpty());
+ }
+
+ @Test
+ public void testResolveDeleted()
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ // one response with columns timestamped before a delete in another response
+ InetAddressAndPort peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+ .add("one", "A")
+ .buildUpdate())));
+ InetAddressAndPort peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, fullPartitionDelete(cfm, dk, 1, nowInSec)));
+
+ try (PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ }
+
+ // peer1 should get the deletion from peer2
+ assertEquals(1, readRepair.sent.size());
+ Mutation mutation = readRepair.getForEndpoint(peer1);
+ assertRepairMetadata(mutation);
+ assertRepairContainsDeletions(mutation, new DeletionTime(1, nowInSec));
+ assertRepairContainsNoColumns(mutation);
+ }
+
+ @Test
+ public void testResolveMultipleDeleted()
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime(), readRepair);
+ // deletes and columns with interleaved timestamp, with out of order return sequence
+ InetAddressAndPort peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
+ // these columns created after the previous deletion
+ InetAddressAndPort peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+ .add("one", "A")
+ .add("two", "A")
+ .buildUpdate())));
+ //this column created after the next delete
+ InetAddressAndPort peer3 = peer();
+ resolver.preprocess(readResponseMessage(peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1")
+ .add("two", "B")
+ .buildUpdate())));
+ InetAddressAndPort peer4 = peer();
+ resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
+
+ try(PartitionIterator data = resolver.resolve())
+ {
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "two");
+ assertColumn(cfm, row, "two", "B", 3);
+ }
+ }
+
+ // peer 1 needs to get the partition delete from peer 4 and the row from peer 3
+ assertEquals(4, readRepair.sent.size());
+ Mutation mutation = readRepair.getForEndpoint(peer1);
+ assertRepairMetadata(mutation);
+ assertRepairContainsDeletions(mutation, new DeletionTime(2, nowInSec));
+ assertRepairContainsColumn(mutation, "1", "two", "B", 3);
+
+ // peer 2 needs the deletion from peer 4 and the row from peer 3
+ mutation = readRepair.getForEndpoint(peer2);
+ assertRepairMetadata(mutation);
+ assertRepairContainsDeletions(mutation, new DeletionTime(2, nowInSec));
+ assertRepairContainsColumn(mutation, "1", "two", "B", 3);
+
+ // peer 3 needs just the deletion from peer 4
+ mutation = readRepair.getForEndpoint(peer3);
+ assertRepairMetadata(mutation);
+ assertRepairContainsDeletions(mutation, new DeletionTime(2, nowInSec));
+ assertRepairContainsNoColumns(mutation);
+
+ // peer 4 needs just the row from peer 3
+ mutation = readRepair.getForEndpoint(peer4);
+ assertRepairMetadata(mutation);
+ assertRepairContainsNoDeletions(mutation);
+ assertRepairContainsColumn(mutation, "1", "two", "B", 3);
+ }
+
+ @Test
+ public void testResolveRangeTombstonesOnBoundaryRightWins() throws UnknownHostException
+ {
+ resolveRangeTombstonesOnBoundary(1, 2);
+ }
+
+ @Test
+ public void testResolveRangeTombstonesOnBoundaryLeftWins() throws UnknownHostException
+ {
+ resolveRangeTombstonesOnBoundary(2, 1);
+ }
+
+ @Test
+ public void testResolveRangeTombstonesOnBoundarySameTimestamp() throws UnknownHostException
+ {
+ resolveRangeTombstonesOnBoundary(1, 1);
+ }
+
+ /*
+ * We want responses to merge on tombstone boundary. So we'll merge 2 "streams":
+ * 1: [1, 2)(3, 4](5, 6] 2
+ * 2: [2, 3][4, 5) 1
+ * which tests all combination of open/close boundaries (open/close, close/open, open/open, close/close).
+ *
+ * Note that, because DataResolver returns a "filtered" iterator, it should resolve into an empty iterator.
+ * However, what should be sent to each source depends on the exact on the timestamps of each tombstones and we
+ * test a few combination.
+ */
+ private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+
+ // 1st "stream"
+ RangeTombstone one_two = tombstone("1", true , "2", false, timestamp1, nowInSec);
+ RangeTombstone three_four = tombstone("3", false, "4", true , timestamp1, nowInSec);
+ RangeTombstone five_six = tombstone("5", false, "6", true , timestamp1, nowInSec);
+ UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(one_two)
+ .addRangeTombstone(three_four)
+ .addRangeTombstone(five_six)
+ .buildUpdate());
+
+ // 2nd "stream"
+ RangeTombstone two_three = tombstone("2", true, "3", true , timestamp2, nowInSec);
+ RangeTombstone four_five = tombstone("4", true, "5", false, timestamp2, nowInSec);
+ UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(two_three)
+ .addRangeTombstone(four_five)
+ .buildUpdate());
+
+ resolver.preprocess(readResponseMessage(peer1, iter1));
+ resolver.preprocess(readResponseMessage(peer2, iter2));
+
+ // No results, we've only reconciled tombstones.
+ try (PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ }
+
+ assertEquals(2, readRepair.sent.size());
+
+ Mutation msg1 = readRepair.getForEndpoint(peer1);
+ assertRepairMetadata(msg1);
+ assertRepairContainsNoColumns(msg1);
+
+ Mutation msg2 = readRepair.getForEndpoint(peer2);
+ assertRepairMetadata(msg2);
+ assertRepairContainsNoColumns(msg2);
+
+ // Both streams are mostly complementary, so they will roughly get the ranges of the other stream. One subtlety is
+ // around the value "4" however, as it's included by both stream.
+ // So for a given stream, unless the other stream has a strictly higher timestamp, the value 4 will be excluded
+ // from whatever range it receives as repair since the stream already covers it.
+
+ // Message to peer1 contains peer2 ranges
+ assertRepairContainsDeletions(msg1, null, two_three, withExclusiveStartIf(four_five, timestamp1 >= timestamp2));
+
+ // Message to peer2 contains peer1 ranges
+ assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
+ }
+
+ /**
+ * Test cases where a boundary of a source is covered by another source deletion and timestamp on one or both side
+ * of the boundary are equal to the "merged" deletion.
+ * This is a test for CASSANDRA-13237 to make sure we handle this case properly.
+ */
+ @Test
+ public void testRepairRangeTombstoneBoundary() throws UnknownHostException
+ {
+ testRepairRangeTombstoneBoundary(1, 0, 1);
+ readRepair.sent.clear();
+ testRepairRangeTombstoneBoundary(1, 1, 0);
+ readRepair.sent.clear();
+ testRepairRangeTombstoneBoundary(1, 1, 1);
+ }
+
+ /**
+ * Test for CASSANDRA-13237, checking we don't fail (and handle correctly) the case where a RT boundary has the
+ * same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could
+ * thus still be sent).
+ */
+ private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+
+ // 1st "stream"
+ RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec);
+ UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+ .addRangeTombstone(one_nine)
+ .buildUpdate());
+
+ // 2nd "stream" (build more manually to ensure we have the boundary we want)
+ RangeTombstoneBoundMarker open_one = marker("0", true, true, timestamp2, nowInSec);
+ RangeTombstoneBoundaryMarker boundary_five = boundary("5", false, timestamp2, nowInSec, timestamp3, nowInSec);
+ RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec);
+ UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine);
+
+ resolver.preprocess(readResponseMessage(peer1, iter1));
+ resolver.preprocess(readResponseMessage(peer2, iter2));
+
+ boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3;
+
+ // No results, we've only reconciled tombstones.
+ try (PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ }
+
+ assertEquals(shouldHaveRepair? 1 : 0, readRepair.sent.size());
+
+ if (!shouldHaveRepair)
+ return;
+
+ Mutation mutation = readRepair.getForEndpoint(peer2);
+ assertRepairMetadata(mutation);
+ assertRepairContainsNoColumns(mutation);
+
+ RangeTombstone expected = timestamp1 != timestamp2
+ // We've repaired the 1st part
+ ? tombstone("0", true, "5", false, timestamp1, nowInSec)
+ // We've repaired the 2nd part
+ : tombstone("5", true, "9", true, timestamp1, nowInSec);
+ assertRepairContainsDeletions(mutation, null, expected);
+ }
+
+ /**
+ * Test for CASSANDRA-13719: tests that having a partition deletion shadow a range tombstone on another source
+ * doesn't trigger an assertion error.
+ */
+ @Test
+ public void testRepairRangeTombstoneWithPartitionDeletion()
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+
+ // 1st "stream": just a partition deletion
+ UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec));
+
+ // 2nd "stream": a range tombstone that is covered by the 1st stream
+ RangeTombstone rt = tombstone("0", true , "10", true, 5, nowInSec);
+ UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+ .addRangeTombstone(rt)
+ .buildUpdate());
+
+ resolver.preprocess(readResponseMessage(peer1, iter1));
+ resolver.preprocess(readResponseMessage(peer2, iter2));
+
+ // No results, we've only reconciled tombstones.
+ try (PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ // 2nd stream should get repaired
+ }
+
+ assertEquals(1, readRepair.sent.size());
+
+ Mutation mutation = readRepair.getForEndpoint(peer2);
+ assertRepairMetadata(mutation);
+ assertRepairContainsNoColumns(mutation);
+
+ assertRepairContainsDeletions(mutation, new DeletionTime(10, nowInSec));
+ }
+
+ /**
+ * Additional test for CASSANDRA-13719: tests the case where a partition deletion doesn't shadow a range tombstone.
+ */
+ @Test
+ public void testRepairRangeTombstoneWithPartitionDeletion2()
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+
+ // 1st "stream": a partition deletion and a range tombstone
+ RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec);
+ PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+ .addRangeTombstone(rt1)
+ .buildUpdate();
+ ((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec));
+ UnfilteredPartitionIterator iter1 = iter(upd1);
+
+ // 2nd "stream": a range tombstone that is covered by the other stream rt
+ RangeTombstone rt2 = tombstone("2", true , "3", true, 11, nowInSec);
+ RangeTombstone rt3 = tombstone("4", true , "5", true, 10, nowInSec);
+ UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+ .addRangeTombstone(rt2)
+ .addRangeTombstone(rt3)
+ .buildUpdate());
+
+ resolver.preprocess(readResponseMessage(peer1, iter1));
+ resolver.preprocess(readResponseMessage(peer2, iter2));
+
+ // No results, we've only reconciled tombstones.
+ try (PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ // 2nd stream should get repaired
+ }
+
+ assertEquals(1, readRepair.sent.size());
+
+ Mutation mutation = readRepair.getForEndpoint(peer2);
+ assertRepairMetadata(mutation);
+ assertRepairContainsNoColumns(mutation);
+
+ // 2nd stream should get both the partition deletion, as well as the part of the 1st stream RT that it misses
+ assertRepairContainsDeletions(mutation, new DeletionTime(10, nowInSec),
+ tombstone("0", true, "2", false, 11, nowInSec),
+ tombstone("3", false, "9", true, 11, nowInSec));
+ }
+
+ // Forces the start to be exclusive if the condition holds
+ private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
+ {
+ if (!condition)
+ return rt;
+
+ Slice slice = rt.deletedSlice();
+ ClusteringBound newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start().getRawValues());
+ return condition
+ ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
+ : rt;
+ }
+
+ // Forces the end to be exclusive if the condition holds
+ private static RangeTombstone withExclusiveEndIf(RangeTombstone rt, boolean condition)
+ {
+ if (!condition)
+ return rt;
+
+ Slice slice = rt.deletedSlice();
+ ClusteringBound newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end().getRawValues());
+ return condition
+ ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
+ : rt;
+ }
+
+ private static ByteBuffer bb(int b)
+ {
+ return ByteBufferUtil.bytes(b);
+ }
+
+ private Cell mapCell(int k, int v, long ts)
+ {
+ return BufferCell.live(m, ts, bb(v), CellPath.create(bb(k)));
+ }
+
+ @Test
+ public void testResolveComplexDelete()
+ {
+ ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+ TestableReadRepair readRepair = new TestableReadRepair(cmd);
+ DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+
+ long[] ts = {100, 200};
+
+ Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+ builder.newRow(Clustering.EMPTY);
+ builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+ builder.addCell(mapCell(0, 0, ts[0]));
+
+ InetAddressAndPort peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ builder.newRow(Clustering.EMPTY);
+ DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
+ builder.addComplexDeletion(m, expectedCmplxDelete);
+ Cell expectedCell = mapCell(1, 1, ts[1]);
+ builder.addCell(expectedCell);
+
+ InetAddressAndPort peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ try(PartitionIterator data = resolver.resolve())
+ {
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+ Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+ }
+ }
+
+
+ Mutation mutation = readRepair.getForEndpoint(peer1);
+ Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
+ assertTrue(rowIter.hasNext());
+ Row row = rowIter.next();
+ assertFalse(rowIter.hasNext());
+
+ ComplexColumnData cd = row.getComplexColumnData(m);
+
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ assertEquals(expectedCmplxDelete, cd.complexDeletion());
+
+ Assert.assertNull(readRepair.sent.get(peer2));
+ }
+
+ @Test
+ public void testResolveDeletedCollection()
+ {
+ ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+ TestableReadRepair readRepair = new TestableReadRepair(cmd);
+ DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+
+ long[] ts = {100, 200};
+
+ Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+ builder.newRow(Clustering.EMPTY);
+ builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+ builder.addCell(mapCell(0, 0, ts[0]));
+
+ InetAddressAndPort peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ builder.newRow(Clustering.EMPTY);
+ DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
+ builder.addComplexDeletion(m, expectedCmplxDelete);
+
+ InetAddressAndPort peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ try(PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ }
+
+ Mutation mutation = readRepair.getForEndpoint(peer1);
+ Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
+ assertTrue(rowIter.hasNext());
+ Row row = rowIter.next();
+ assertFalse(rowIter.hasNext());
+
+ ComplexColumnData cd = row.getComplexColumnData(m);
+
+ assertEquals(Collections.emptySet(), Sets.newHashSet(cd));
+ assertEquals(expectedCmplxDelete, cd.complexDeletion());
+
+ Assert.assertNull(readRepair.sent.get(peer2));
+ }
+
+ @Test
+ public void testResolveNewCollection()
+ {
+ ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+ TestableReadRepair readRepair = new TestableReadRepair(cmd);
+ DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+
+ long[] ts = {100, 200};
+
+ // map column
+ Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+ builder.newRow(Clustering.EMPTY);
+ DeletionTime expectedCmplxDelete = new DeletionTime(ts[0] - 1, nowInSec);
+ builder.addComplexDeletion(m, expectedCmplxDelete);
+ Cell expectedCell = mapCell(0, 0, ts[0]);
+ builder.addCell(expectedCell);
+
+ // empty map column
+ InetAddressAndPort peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ InetAddressAndPort peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
+
+ try(PartitionIterator data = resolver.resolve())
+ {
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+ }
+
+ Assert.assertNull(readRepair.sent.get(peer1));
+
+ Mutation mutation = readRepair.getForEndpoint(peer2);
+ Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
+ assertTrue(rowIter.hasNext());
+ Row row = rowIter.next();
+ assertFalse(rowIter.hasNext());
+
+ ComplexColumnData cd = row.getComplexColumnData(m);
+
+ assertEquals(Sets.newHashSet(expectedCell), Sets.newHashSet(cd));
+ assertEquals(expectedCmplxDelete, cd.complexDeletion());
+ }
+
+ @Test
+ public void testResolveNewCollectionOverwritingDeleted()
+ {
+ ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+ TestableReadRepair readRepair = new TestableReadRepair(cmd);
+ DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+
+ long[] ts = {100, 200};
+
+ // cleared map column
+ Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+ builder.newRow(Clustering.EMPTY);
+ builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+
+ InetAddressAndPort peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ // newer, overwritten map column
+ builder.newRow(Clustering.EMPTY);
+ DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
+ builder.addComplexDeletion(m, expectedCmplxDelete);
+ Cell expectedCell = mapCell(1, 1, ts[1]);
+ builder.addCell(expectedCell);
+
+ InetAddressAndPort peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ try(PartitionIterator data = resolver.resolve())
+ {
+ try (RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+ }
+
+ Row row = Iterators.getOnlyElement(readRepair.getForEndpoint(peer1).getPartitionUpdate(cfm2).iterator());
+
+ ComplexColumnData cd = row.getComplexColumnData(m);
+
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ assertEquals(expectedCmplxDelete, cd.complexDeletion());
+
+ Assert.assertNull(readRepair.sent.get(peer2));
+ }
+
+ private InetAddressAndPort peer()
+ {
+ try
+ {
+ return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void assertRepairContainsDeletions(Mutation mutation,
+ DeletionTime deletionTime,
+ RangeTombstone...rangeTombstones)
+ {
+ PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
+ DeletionInfo deletionInfo = update.deletionInfo();
+ if (deletionTime != null)
+ assertEquals(deletionTime, deletionInfo.getPartitionDeletion());
+
+ assertEquals(rangeTombstones.length, deletionInfo.rangeCount());
+ Iterator<RangeTombstone> ranges = deletionInfo.rangeIterator(false);
+ int i = 0;
+ while (ranges.hasNext())
+ {
+ RangeTombstone expected = rangeTombstones[i++];
+ RangeTombstone actual = ranges.next();
+ String msg = String.format("Expected %s, but got %s", expected.toString(cfm.comparator), actual.toString(cfm.comparator));
+ assertEquals(msg, expected, actual);
+ }
+ }
+
+ private void assertRepairContainsNoDeletions(Mutation mutation)
+ {
+ PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
+ assertTrue(update.deletionInfo().isLive());
+ }
+
+ private void assertRepairContainsColumn(Mutation mutation,
+ String clustering,
+ String columnName,
+ String value,
+ long timestamp)
+ {
+ PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
+ Row row = update.getRow(update.metadata().comparator.make(clustering));
+ assertNotNull(row);
+ assertColumn(cfm, row, columnName, value, timestamp);
+ }
+
+ private void assertRepairContainsNoColumns(Mutation mutation)
+ {
+ PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
+ assertFalse(update.iterator().hasNext());
+ }
+
+ private void assertRepairMetadata(Mutation mutation)
+ {
+ PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
+ assertEquals(update.metadata().keyspace, cfm.keyspace);
+ assertEquals(update.metadata().name, cfm.name);
+ }
+
+
+ public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator)
+ {
+ return readResponseMessage(from, partitionIterator, command);
+
+ }
+ public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd)
+ {
+ return MessageIn.create(from,
+ ReadResponse.createRemoteDataResponse(partitionIterator, cmd),
+ Collections.EMPTY_MAP,
+ MessagingService.Verb.REQUEST_RESPONSE,
+ MessagingService.current_version);
+ }
+
+ private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
+ {
+ return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
+ }
+
+ private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
+ {
+ ClusteringBound startBound = rtBound(start, true, inclusiveStart);
+ ClusteringBound endBound = rtBound(end, false, inclusiveEnd);
+ return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
+ }
+
+ private ClusteringBound rtBound(Object value, boolean isStart, boolean inclusive)
+ {
+ ClusteringBound.Kind kind = isStart
+ ? (inclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND)
+ : (inclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND);
+
+ return ClusteringBound.create(kind, cfm.comparator.make(value).getRawValues());
+ }
+
+ private ClusteringBoundary rtBoundary(Object value, boolean inclusiveOnEnd)
+ {
+ ClusteringBound.Kind kind = inclusiveOnEnd
+ ? Kind.INCL_END_EXCL_START_BOUNDARY
+ : Kind.EXCL_END_INCL_START_BOUNDARY;
+ return ClusteringBoundary.create(kind, cfm.comparator.make(value).getRawValues());
+ }
+
+ private RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime)
+ {
+ return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime));
+ }
+
+ private RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2)
+ {
+ return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd),
+ new DeletionTime(markedForDeleteAt1, localDeletionTime1),
+ new DeletionTime(markedForDeleteAt2, localDeletionTime2));
+ }
+
+ private UnfilteredPartitionIterator fullPartitionDelete(TableMetadata table, DecoratedKey dk, long timestamp, int nowInSec)
+ {
+ return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(table, dk, timestamp, nowInSec).unfilteredIterator());
+ }
+
+ private UnfilteredPartitionIterator iter(PartitionUpdate update)
+ {
+ return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator());
+ }
+
+ private UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds)
+ {
+ SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator);
+ Collections.addAll(s, unfiltereds);
+ final Iterator<Unfiltered> iterator = s.iterator();
+
+ UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm,
+ key,
+ DeletionTime.LIVE,
+ cfm.regularAndStaticColumns(),
+ Rows.EMPTY_STATIC_ROW,
+ false,
+ EncodingStats.NO_STATS)
+ {
+ protected Unfiltered computeNext()
+ {
+ return iterator.hasNext() ? iterator.next() : endOfData();
+ }
+ };
+ return new SingletonUnfilteredPartitionIterator(rowIter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
new file mode 100644
index 0000000..de7b2e4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.reads.AbstractReadExecutor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ReadExecutorTest
+{
+ static Keyspace ks;
+ static ColumnFamilyStore cfs;
+ static List<InetAddressAndPort> targets;
+
+ @BeforeClass
+ public static void setUpClass() throws Throwable
+ {
+ SchemaLoader.loadSchema();
+ SchemaLoader.createKeyspace("Foo", KeyspaceParams.simple(3), SchemaLoader.standardCFMD("Foo", "Bar"));
+ ks = Keyspace.open("Foo");
+ cfs = ks.getColumnFamilyStore("Bar");
+ targets = ImmutableList.of(InetAddressAndPort.getByName("127.0.0.255"), InetAddressAndPort.getByName("127.0.0.254"), InetAddressAndPort.getByName("127.0.0.253"));
+ cfs.sampleLatencyNanos = 0;
+ }
+
+ @Before
+ public void resetCounters() throws Throwable
+ {
+ cfs.metric.speculativeInsufficientReplicas.dec(cfs.metric.speculativeInsufficientReplicas.getCount());
+ cfs.metric.speculativeRetries.dec(cfs.metric.speculativeRetries.getCount());
+ cfs.metric.speculativeFailedRetries.dec(cfs.metric.speculativeFailedRetries.getCount());
+ }
+
+ /**
+ * If speculation would have been beneficial but could not be attempted due to lack of replicas
+ * count that it occured
+ */
+ @Test
+ public void testUnableToSpeculate() throws Throwable
+ {
+ assertEquals(0, cfs.metric.speculativeInsufficientReplicas.getCount());
+ assertEquals(0, ks.metric.speculativeInsufficientReplicas.getCount());
+ AbstractReadExecutor executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), true);
+ executor.maybeTryAdditionalReplicas();
+ try
+ {
+ executor.awaitResponses();
+ fail();
+ }
+ catch (ReadTimeoutException e)
+ {
+ //expected
+ }
+ assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
+ assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
+
+ //Shouldn't increment
+ executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), false);
+ executor.maybeTryAdditionalReplicas();
+ try
+ {
+ executor.awaitResponses();
+ fail();
+ }
+ catch (ReadTimeoutException e)
+ {
+ //expected
+ }
+ assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
+ assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
+ }
+
+ /**
+ * Test that speculation when it is attempted is countedc, and when it succeed
+ * no failure is counted.
+ */
+ @Test
+ public void testSpeculateSucceeded() throws Throwable
+ {
+ assertEquals(0, cfs.metric.speculativeRetries.getCount());
+ assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
+ assertEquals(0, ks.metric.speculativeRetries.getCount());
+ assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
+ AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
+ executor.maybeTryAdditionalReplicas();
+ new Thread()
+ {
+ @Override
+ public void run()
+ {
+ //Failures end the read promptly but don't require mock data to be suppleid
+ executor.handler.onFailure(targets.get(0), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+ executor.handler.onFailure(targets.get(1), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+ executor.handler.condition.signalAll();
+ }
+ }.start();
+
+ try
+ {
+ executor.awaitResponses();
+ fail();
+ }
+ catch (ReadFailureException e)
+ {
+ //expected
+ }
+ assertEquals(1, cfs.metric.speculativeRetries.getCount());
+ assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
+ assertEquals(1, ks.metric.speculativeRetries.getCount());
+ assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
+
+ }
+
+ /**
+ * Test that speculation failure statistics are incremented if speculation occurs
+ * and the read still times out.
+ */
+ @Test
+ public void testSpeculateFailed() throws Throwable
+ {
+ assertEquals(0, cfs.metric.speculativeRetries.getCount());
+ assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
+ assertEquals(0, ks.metric.speculativeRetries.getCount());
+ assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
+ AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
+ executor.maybeTryAdditionalReplicas();
+ try
+ {
+ executor.awaitResponses();
+ fail();
+ }
+ catch (ReadTimeoutException e)
+ {
+ //expected
+ }
+ assertEquals(1, cfs.metric.speculativeRetries.getCount());
+ assertEquals(1, cfs.metric.speculativeFailedRetries.getCount());
+ assertEquals(1, ks.metric.speculativeRetries.getCount());
+ assertEquals(1, ks.metric.speculativeFailedRetries.getCount());
+ }
+
+ public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand
+ {
+ private final long timeout;
+
+ MockSinglePartitionReadCommand()
+ {
+ this(0);
+ }
+
+ MockSinglePartitionReadCommand(long timeout)
+ {
+ super(false, 0, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null);
+ this.timeout = timeout;
+ }
+
+ @Override
+ public long getTimeout()
+ {
+ return timeout;
+ }
+
+ @Override
+ public MessageOut createMessage()
+ {
+ return new MessageOut(MessagingService.Verb.BATCH_REMOVE)
+ {
+ @Override
+ public int serializedSize(int version)
+ {
+ return 0;
+ }
+ };
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
new file mode 100644
index 0000000..d125c9d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.DigestResolver;
+import org.apache.cassandra.service.reads.ResponseResolver;
+import org.apache.cassandra.tracing.TraceState;
+
+public class TestableReadRepair implements ReadRepair, RepairListener
+{
+ public final Map<InetAddressAndPort, Mutation> sent = new HashMap<>();
+
+ private final ReadCommand command;
+
+ public TestableReadRepair(ReadCommand command)
+ {
+ this.command = command;
+ }
+
+ private class TestablePartitionRepair implements RepairListener.PartitionRepair
+ {
+ @Override
+ public void reportMutation(InetAddressAndPort endpoint, Mutation mutation)
+ {
+ sent.put(endpoint, mutation);
+ }
+
+ @Override
+ public void finish()
+ {
+
+ }
+ }
+
+ @Override
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+ {
+ return new PartitionIteratorMergeListener(endpoints, command, this);
+ }
+
+ @Override
+ public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+ {
+
+ }
+
+ @Override
+ public void awaitForegroundRepairFinish() throws ReadTimeoutException
+ {
+
+ }
+
+ @Override
+ public void maybeStartBackgroundRepair(ResponseResolver resolver)
+ {
+
+ }
+
+ @Override
+ public void backgroundDigestRepair(TraceState traceState)
+ {
+
+ }
+
+ @Override
+ public PartitionRepair startPartitionRepair()
+ {
+ return new TestablePartitionRepair();
+ }
+
+ @Override
+ public void awaitRepairs(long timeoutMillis)
+ {
+
+ }
+
+ public Mutation getForEndpoint(InetAddressAndPort endpoint)
+ {
+ return sent.get(endpoint);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[4/5] cassandra git commit: Refactor read executor and response
resolver, abstract read repair
Posted by bd...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
deleted file mode 100644
index e7f30b4..0000000
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.exceptions.ReadFailureException;
-import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.ReadRepairMetrics;
-import org.apache.cassandra.net.IAsyncCallbackWithFailure;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.SimpleCondition;
-
-public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
-{
- protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
-
- public final ResponseResolver resolver;
- final SimpleCondition condition = new SimpleCondition();
- private final long queryStartNanoTime;
- final int blockfor;
- final List<InetAddressAndPort> endpoints;
- private final ReadCommand command;
- private final ConsistencyLevel consistencyLevel;
- private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
- = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
- private volatile int received = 0;
- private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater
- = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
- private volatile int failures = 0;
- private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
-
- private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
-
- /**
- * Constructor when response count has to be calculated and blocked for.
- */
- public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddressAndPort> filteredEndpoints, long queryStartNanoTime)
- {
- this(resolver,
- consistencyLevel,
- consistencyLevel.blockFor(Keyspace.open(command.metadata().keyspace)),
- command,
- Keyspace.open(command.metadata().keyspace),
- filteredEndpoints,
- queryStartNanoTime);
- }
-
- public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddressAndPort> endpoints, long queryStartNanoTime)
- {
- this.command = command;
- this.keyspace = keyspace;
- this.blockfor = blockfor;
- this.consistencyLevel = consistencyLevel;
- this.resolver = resolver;
- this.queryStartNanoTime = queryStartNanoTime;
- this.endpoints = endpoints;
- this.failureReasonByEndpoint = new ConcurrentHashMap<>();
- // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
- assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size();
-
- if (logger.isTraceEnabled())
- logger.trace("Blockfor is {}; setting up requests to {}", blockfor, StringUtils.join(this.endpoints, ","));
- }
-
- public boolean await(long timePastStart, TimeUnit unit)
- {
- long time = unit.toNanos(timePastStart) - (System.nanoTime() - queryStartNanoTime);
- try
- {
- return condition.await(time, TimeUnit.NANOSECONDS);
- }
- catch (InterruptedException ex)
- {
- throw new AssertionError(ex);
- }
- }
-
- public void awaitResults() throws ReadFailureException, ReadTimeoutException
- {
- boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
- boolean failed = blockfor + failures > endpoints.size();
- if (signaled && !failed)
- return;
-
- if (Tracing.isTracing())
- {
- String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
- Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
- }
- else if (logger.isDebugEnabled())
- {
- String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
- logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
- }
-
- // Same as for writes, see AbstractWriteResponseHandler
- throw failed
- ? new ReadFailureException(consistencyLevel, received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
- : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
- }
-
- public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
- {
- awaitResults();
-
- PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve();
- if (logger.isTraceEnabled())
- logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanoTime));
- return result;
- }
-
- public int blockFor()
- {
- return blockfor;
- }
-
- public void response(MessageIn<ReadResponse> message)
- {
- resolver.preprocess(message);
- int n = waitingFor(message.from)
- ? recievedUpdater.incrementAndGet(this)
- : received;
- if (n >= blockfor && resolver.isDataPresent())
- {
- condition.signalAll();
- // kick off a background digest comparison if this is a result that (may have) arrived after
- // the original resolve that get() kicks off as soon as the condition is signaled
- if (blockfor < endpoints.size() && n == endpoints.size())
- {
- TraceState traceState = Tracing.instance.get();
- if (traceState != null)
- traceState.trace("Initiating read-repair");
- StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState, queryStartNanoTime));
- }
- }
- }
-
- /**
- * @return true if the message counts towards the blockfor threshold
- */
- private boolean waitingFor(InetAddressAndPort from)
- {
- return consistencyLevel.isDatacenterLocal()
- ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
- : true;
- }
-
- /**
- * @return the current number of received responses
- */
- public int getReceivedCount()
- {
- return received;
- }
-
- public void response(ReadResponse result)
- {
- MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddressAndPort(),
- result,
- Collections.emptyMap(),
- MessagingService.Verb.INTERNAL_RESPONSE,
- MessagingService.current_version);
- response(message);
- }
-
- public void assureSufficientLiveNodes() throws UnavailableException
- {
- consistencyLevel.assureSufficientLiveNodes(keyspace, endpoints);
- }
-
- public boolean isLatencyForSnitch()
- {
- return true;
- }
-
- private class AsyncRepairRunner implements Runnable
- {
- private final TraceState traceState;
- private final long queryStartNanoTime;
-
- public AsyncRepairRunner(TraceState traceState, long queryStartNanoTime)
- {
- this.traceState = traceState;
- this.queryStartNanoTime = queryStartNanoTime;
- }
-
- public void run()
- {
- // If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch.
- // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
- // get a digest mismatch).
- try
- {
- resolver.compareResponses();
- }
- catch (DigestMismatchException e)
- {
- assert resolver instanceof DigestResolver;
-
- if (traceState != null)
- traceState.trace("Digest mismatch: {}", e.toString());
- if (logger.isDebugEnabled())
- logger.debug("Digest mismatch:", e);
-
- ReadRepairMetrics.repairedBackground.mark();
-
- final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size(), queryStartNanoTime);
- AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
-
- for (InetAddressAndPort endpoint : endpoints)
- MessagingService.instance().sendRR(command.createMessage(), endpoint, repairHandler);
- }
- }
- }
-
- @Override
- public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
- {
- int n = waitingFor(from)
- ? failuresUpdater.incrementAndGet(this)
- : failures;
-
- failureReasonByEndpoint.put(from, failureReason);
-
- if (blockfor + n > endpoints.size())
- condition.signalAll();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/ReadRepairDecision.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadRepairDecision.java b/src/java/org/apache/cassandra/service/ReadRepairDecision.java
deleted file mode 100644
index 8d2ced7..0000000
--- a/src/java/org/apache/cassandra/service/ReadRepairDecision.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-public enum ReadRepairDecision
-{
- NONE, GLOBAL, DC_LOCAL;
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java
deleted file mode 100644
index 81b18b6..0000000
--- a/src/java/org/apache/cassandra/service/ResponseResolver.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.concurrent.Accumulator;
-
-public abstract class ResponseResolver
-{
- protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class);
-
- protected final Keyspace keyspace;
- protected final ReadCommand command;
- protected final ConsistencyLevel consistency;
-
- // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
- protected final Accumulator<MessageIn<ReadResponse>> responses;
-
- public ResponseResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
- {
- this.keyspace = keyspace;
- this.command = command;
- this.consistency = consistency;
- this.responses = new Accumulator<>(maxResponseCount);
- }
-
- public abstract PartitionIterator getData();
- public abstract PartitionIterator resolve() throws DigestMismatchException;
-
- /**
- * Compares received responses, potentially triggering a digest mismatch (for a digest resolver) and read-repairs
- * (for a data resolver).
- * <p>
- * This is functionally equivalent to calling {@link #resolve()} and consuming the result, but can be slightly more
- * efficient in some case due to the fact that we don't care about the result itself. This is used when doing
- * asynchronous read-repairs.
- *
- * @throws DigestMismatchException if it's a digest resolver and the responses don't match.
- */
- public abstract void compareResponses() throws DigestMismatchException;
-
- public abstract boolean isDataPresent();
-
- public void preprocess(MessageIn<ReadResponse> message)
- {
- responses.add(message);
- }
-
- public Iterable<MessageIn<ReadResponse>> getMessages()
- {
- return responses;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e2125d4..7d4e34c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -44,6 +44,10 @@ import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.service.reads.AbstractReadExecutor;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
@@ -1751,139 +1755,54 @@ public class StorageProxy implements StorageProxyMBean
{
int cmdCount = commands.size();
- SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount];
- for (int i = 0; i < cmdCount; i++)
- reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel, queryStartNanoTime);
+ AbstractReadExecutor[] reads = new AbstractReadExecutor[cmdCount];
- for (int i = 0; i < cmdCount; i++)
- reads[i].doInitialQueries();
-
- for (int i = 0; i < cmdCount; i++)
- reads[i].maybeTryAdditionalReplicas();
-
- for (int i = 0; i < cmdCount; i++)
- reads[i].awaitResultsAndRetryOnDigestMismatch();
-
- for (int i = 0; i < cmdCount; i++)
- if (!reads[i].isDone())
- reads[i].maybeAwaitFullDataRead();
-
- List<PartitionIterator> results = new ArrayList<>(cmdCount);
- for (int i = 0; i < cmdCount; i++)
+ for (int i=0; i<cmdCount; i++)
{
- assert reads[i].isDone();
- results.add(reads[i].getResult());
+ reads[i] = AbstractReadExecutor.getReadExecutor(commands.get(i), consistencyLevel, queryStartNanoTime);
}
- return PartitionIterators.concat(results);
- }
-
- private static class SinglePartitionReadLifecycle
- {
- private final SinglePartitionReadCommand command;
- private final AbstractReadExecutor executor;
- private final ConsistencyLevel consistency;
- private final long queryStartNanoTime;
-
- private PartitionIterator result;
- private ReadCallback repairHandler;
-
- SinglePartitionReadLifecycle(SinglePartitionReadCommand command, ConsistencyLevel consistency, long queryStartNanoTime)
+ for (int i=0; i<cmdCount; i++)
{
- this.command = command;
- this.executor = AbstractReadExecutor.getReadExecutor(command, consistency, queryStartNanoTime);
- this.consistency = consistency;
- this.queryStartNanoTime = queryStartNanoTime;
+ reads[i].executeAsync();
}
- boolean isDone()
+ for (int i=0; i<cmdCount; i++)
{
- return result != null;
+ reads[i].maybeTryAdditionalReplicas();
}
- void doInitialQueries()
+ for (int i=0; i<cmdCount; i++)
{
- executor.executeAsync();
+ reads[i].awaitResponses();
}
- void maybeTryAdditionalReplicas()
+ for (int i=0; i<cmdCount; i++)
{
- executor.maybeTryAdditionalReplicas();
+ reads[i].maybeRepairAdditionalReplicas();
}
- void awaitResultsAndRetryOnDigestMismatch() throws ReadFailureException, ReadTimeoutException
+ for (int i=0; i<cmdCount; i++)
{
- try
- {
- result = executor.get();
- }
- catch (DigestMismatchException ex)
- {
- Tracing.trace("Digest mismatch: {}", ex.getMessage());
-
- ReadRepairMetrics.repairedBlocking.mark();
-
- // Do a full data read to resolve the correct response (and repair node that need be)
- Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
- DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size(), queryStartNanoTime);
- repairHandler = new ReadCallback(resolver,
- ConsistencyLevel.ALL,
- executor.getContactedReplicas().size(),
- command,
- keyspace,
- executor.handler.endpoints,
- queryStartNanoTime);
-
- for (InetAddressAndPort endpoint : executor.getContactedReplicas())
- {
- Tracing.trace("Enqueuing full data read to {}", endpoint);
- MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, repairHandler);
- }
- }
+ reads[i].awaitReadRepair();
}
- void maybeAwaitFullDataRead() throws ReadTimeoutException
+ List<PartitionIterator> results = new ArrayList<>(cmdCount);
+ for (int i=0; i<cmdCount; i++)
{
- // There wasn't a digest mismatch, we're good
- if (repairHandler == null)
- return;
-
- // Otherwise, get the result from the full-data read and check that it's not a short read
- try
- {
- result = repairHandler.get();
- }
- catch (DigestMismatchException e)
- {
- throw new AssertionError(e); // full data requested from each node here, no digests should be sent
- }
- catch (ReadTimeoutException e)
- {
- if (Tracing.isTracing())
- Tracing.trace("Timed out waiting on digest mismatch repair requests");
- else
- logger.trace("Timed out waiting on digest mismatch repair requests");
- // the caught exception here will have CL.ALL from the repair command,
- // not whatever CL the initial command was at (CASSANDRA-7947)
- int blockFor = consistency.blockFor(Keyspace.open(command.metadata().keyspace));
- throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
- }
+ results.add(reads[i].getResult());
}
- PartitionIterator getResult()
- {
- assert result != null;
- return result;
- }
+ return PartitionIterators.concat(results);
}
- static class LocalReadRunnable extends DroppableRunnable
+ public static class LocalReadRunnable extends DroppableRunnable
{
private final ReadCommand command;
private final ReadCallback handler;
private final long start = System.nanoTime();
- LocalReadRunnable(ReadCommand command, ReadCallback handler)
+ public LocalReadRunnable(ReadCommand command, ReadCallback handler)
{
super(MessagingService.Verb.READ);
this.command = command;
@@ -2081,11 +2000,13 @@ public class StorageProxy implements StorageProxyMBean
private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
{
+ private final DataResolver resolver;
private final ReadCallback handler;
private PartitionIterator result;
- private SingleRangeResponse(ReadCallback handler)
+ private SingleRangeResponse(DataResolver resolver, ReadCallback handler)
{
+ this.resolver = resolver;
this.handler = handler;
}
@@ -2094,14 +2015,8 @@ public class StorageProxy implements StorageProxyMBean
if (result != null)
return;
- try
- {
- result = handler.get();
- }
- catch (DigestMismatchException e)
- {
- throw new AssertionError(e); // no digests in range slices yet
- }
+ handler.awaitResults();
+ result = resolver.resolve();
}
protected RowIterator computeNext()
@@ -2223,12 +2138,13 @@ public class StorageProxy implements StorageProxyMBean
{
PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range, isFirst);
- DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size(), queryStartNanoTime);
+ ReadRepair readRepair = ReadRepair.create(command, toQuery.filteredEndpoints, queryStartNanoTime, consistency);
+ DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size(), queryStartNanoTime, readRepair);
int blockFor = consistency.blockFor(keyspace);
int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor);
List<InetAddressAndPort> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses);
- ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints, queryStartNanoTime);
+ ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints, queryStartNanoTime, readRepair);
handler.assureSufficientLiveNodes();
@@ -2245,7 +2161,7 @@ public class StorageProxy implements StorageProxyMBean
}
}
- return new SingleRangeResponse(handler);
+ return new SingleRangeResponse(resolver, handler);
}
private PartitionIterator sendNextRequests()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
new file mode 100644
index 0000000..5a660ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.schema.SpeculativeRetryParam;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+
+/**
+ * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
+ *
+ * Optionally, may perform additional requests to provide redundancy against replica failure:
+ * AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while
+ * SpeculatingReadExecutor will wait until it looks like the original request is in danger
+ * of timing out before performing extra reads.
+ */
+public abstract class AbstractReadExecutor
+{
+ private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
+
+ protected final ReadCommand command;
+ protected final ConsistencyLevel consistency;
+ protected final List<InetAddressAndPort> targetReplicas;
+ protected final ReadRepair readRepair;
+ protected final DigestResolver digestResolver;
+ protected final ReadCallback handler;
+ protected final TraceState traceState;
+ protected final ColumnFamilyStore cfs;
+ protected final long queryStartNanoTime;
+ protected volatile PartitionIterator result = null;
+
+ AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime)
+ {
+ this.command = command;
+ this.consistency = consistency;
+ this.targetReplicas = targetReplicas;
+ this.readRepair = ReadRepair.create(command, targetReplicas, queryStartNanoTime, consistency);
+ this.digestResolver = new DigestResolver(keyspace, command, consistency, readRepair, targetReplicas.size());
+ this.handler = new ReadCallback(digestResolver, consistency, command, targetReplicas, queryStartNanoTime, readRepair);
+ this.cfs = cfs;
+ this.traceState = Tracing.instance.get();
+ this.queryStartNanoTime = queryStartNanoTime;
+
+ // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes
+ // knows how to produce older digest but the reverse is not true.
+ // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
+ // we stop being compatible with pre-3.0 nodes.
+ int digestVersion = MessagingService.current_version;
+ for (InetAddressAndPort replica : targetReplicas)
+ digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica));
+ command.setDigestVersion(digestVersion);
+ }
+
+ private DecoratedKey getKey()
+ {
+ if (command instanceof SinglePartitionReadCommand)
+ {
+ return ((SinglePartitionReadCommand) command).partitionKey();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints)
+ {
+ makeRequests(command, endpoints);
+
+ }
+
+ protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints)
+ {
+ makeRequests(command.copyAsDigestQuery(), endpoints);
+ }
+
+ private void makeRequests(ReadCommand readCommand, Iterable<InetAddressAndPort> endpoints)
+ {
+ boolean hasLocalEndpoint = false;
+
+ for (InetAddressAndPort endpoint : endpoints)
+ {
+ if (StorageProxy.canDoLocalRequest(endpoint))
+ {
+ hasLocalEndpoint = true;
+ continue;
+ }
+
+ if (traceState != null)
+ traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
+ logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
+ MessageOut<ReadCommand> message = readCommand.createMessage();
+ MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
+ }
+
+ // We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
+ if (hasLocalEndpoint)
+ {
+ logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
+ }
+ }
+
+ /**
+ * Perform additional requests if it looks like the original will time out. May block while it waits
+ * to see if the original requests are answered first.
+ */
+ public abstract void maybeTryAdditionalReplicas();
+
+ /**
+ * Get the replicas involved in the [finished] request.
+ *
+ * @return target replicas + the extra replica, *IF* we speculated.
+ */
+ public abstract List<InetAddressAndPort> getContactedReplicas();
+
+ /**
+ * send the initial set of requests
+ */
+ public abstract void executeAsync();
+
+ private static ReadRepairDecision newReadRepairDecision(TableMetadata metadata)
+ {
+ if (metadata.params.readRepairChance > 0d ||
+ metadata.params.dcLocalReadRepairChance > 0)
+ {
+ double chance = ThreadLocalRandom.current().nextDouble();
+ if (metadata.params.readRepairChance > chance)
+ return ReadRepairDecision.GLOBAL;
+
+ if (metadata.params.dcLocalReadRepairChance > chance)
+ return ReadRepairDecision.DC_LOCAL;
+ }
+
+ return ReadRepairDecision.NONE;
+ }
+
+ /**
+ * @return an executor appropriate for the configured speculative read policy
+ */
+ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException
+ {
+ Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+ List<InetAddressAndPort> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
+ // 11980: Excluding EACH_QUORUM reads from potential RR, so that we do not miscount DC responses
+ ReadRepairDecision repairDecision = consistencyLevel == ConsistencyLevel.EACH_QUORUM
+ ? ReadRepairDecision.NONE
+ : newReadRepairDecision(command.metadata());
+ List<InetAddressAndPort> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
+
+ // Throw UAE early if we don't have enough replicas.
+ consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
+
+ if (repairDecision != ReadRepairDecision.NONE)
+ {
+ Tracing.trace("Read-repair {}", repairDecision);
+ ReadRepairMetrics.attempted.mark();
+ }
+
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
+ SpeculativeRetryParam retry = cfs.metadata().params.speculativeRetry;
+
+ // Speculative retry is disabled *OR*
+ // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
+ if (retry.equals(SpeculativeRetryParam.NONE)
+ | consistencyLevel == ConsistencyLevel.EACH_QUORUM)
+ return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime, false);
+
+ // There are simply no extra replicas to speculate.
+ // Handle this separately so it can log failed attempts to speculate due to lack of replicas
+ if (consistencyLevel.blockFor(keyspace) == allReplicas.size())
+ return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime, true);
+
+ if (targetReplicas.size() == allReplicas.size())
+ {
+ // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
+ // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
+ // (same amount of requests in total, but we turn 1 digest request into a full blown data request).
+ return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
+ }
+
+ // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
+ InetAddressAndPort extraReplica = allReplicas.get(targetReplicas.size());
+ // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so
+ // we might have to find a replacement that's not already in targetReplicas.
+ if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica))
+ {
+ for (InetAddressAndPort address : allReplicas)
+ {
+ if (!targetReplicas.contains(address))
+ {
+ extraReplica = address;
+ break;
+ }
+ }
+ }
+ targetReplicas.add(extraReplica);
+
+ if (retry.equals(SpeculativeRetryParam.ALWAYS))
+ return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
+ else // PERCENTILE or CUSTOM.
+ return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
+ }
+
+ /**
+ * Returns true if speculation should occur and if it should then block until it is time to
+ * send the speculative reads
+ */
+ boolean shouldSpeculateAndMaybeWait()
+ {
+ // no latency information, or we're overloaded
+ if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
+ return false;
+
+ return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS);
+ }
+
+ void onReadTimeout() {}
+
+ public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
+ {
+ /**
+ * If never speculating due to lack of replicas
+ * log it is as a failure if it should have happened
+ * but couldn't due to lack of replicas
+ */
+ private final boolean logFailedSpeculation;
+
+ public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation)
+ {
+ super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
+ this.logFailedSpeculation = logFailedSpeculation;
+ }
+
+ public void executeAsync()
+ {
+ makeDataRequests(targetReplicas.subList(0, 1));
+ if (targetReplicas.size() > 1)
+ makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
+ }
+
+ public void maybeTryAdditionalReplicas()
+ {
+ if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
+ {
+ cfs.metric.speculativeInsufficientReplicas.inc();
+ }
+ }
+
+ public List<InetAddressAndPort> getContactedReplicas()
+ {
+ return targetReplicas;
+ }
+ }
+
+ static class SpeculatingReadExecutor extends AbstractReadExecutor
+ {
+ private volatile boolean speculated = false;
+
+ public SpeculatingReadExecutor(Keyspace keyspace,
+ ColumnFamilyStore cfs,
+ ReadCommand command,
+ ConsistencyLevel consistencyLevel,
+ List<InetAddressAndPort> targetReplicas,
+ long queryStartNanoTime)
+ {
+ super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
+ }
+
+ public void executeAsync()
+ {
+ // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating. So we know
+ // that the last replica in our list is "extra."
+ List<InetAddressAndPort> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
+
+ if (handler.blockfor < initialReplicas.size())
+ {
+ // We're hitting additional targets for read repair. Since our "extra" replica is the least-
+ // preferred by the snitch, we do an extra data read to start with against a replica more
+ // likely to reply; better to let RR fail than the entire query.
+ makeDataRequests(initialReplicas.subList(0, 2));
+ if (initialReplicas.size() > 2)
+ makeDigestRequests(initialReplicas.subList(2, initialReplicas.size()));
+ }
+ else
+ {
+ // not doing read repair; all replies are important, so it doesn't matter which nodes we
+ // perform data reads against vs digest.
+ makeDataRequests(initialReplicas.subList(0, 1));
+ if (initialReplicas.size() > 1)
+ makeDigestRequests(initialReplicas.subList(1, initialReplicas.size()));
+ }
+ }
+
+ public void maybeTryAdditionalReplicas()
+ {
+ if (shouldSpeculateAndMaybeWait())
+ {
+ //Handle speculation stats first in case the callback fires immediately
+ speculated = true;
+ cfs.metric.speculativeRetries.inc();
+ // Could be waiting on the data, or on enough digests.
+ ReadCommand retryCommand = command;
+ if (handler.resolver.isDataPresent())
+ retryCommand = command.copyAsDigestQuery();
+
+ InetAddressAndPort extraReplica = Iterables.getLast(targetReplicas);
+ if (traceState != null)
+ traceState.trace("speculating read retry on {}", extraReplica);
+ logger.trace("speculating read retry on {}", extraReplica);
+ MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler);
+ }
+ }
+
+ public List<InetAddressAndPort> getContactedReplicas()
+ {
+ return speculated
+ ? targetReplicas
+ : targetReplicas.subList(0, targetReplicas.size() - 1);
+ }
+
+ @Override
+ void onReadTimeout()
+ {
+ //Shouldn't be possible to get here without first attempting to speculate even if the
+ //timing is bad
+ assert speculated;
+ cfs.metric.speculativeFailedRetries.inc();
+ }
+ }
+
+ private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
+ {
+ public AlwaysSpeculatingReadExecutor(Keyspace keyspace,
+ ColumnFamilyStore cfs,
+ ReadCommand command,
+ ConsistencyLevel consistencyLevel,
+ List<InetAddressAndPort> targetReplicas,
+ long queryStartNanoTime)
+ {
+ super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
+ }
+
+ public void maybeTryAdditionalReplicas()
+ {
+ // no-op
+ }
+
+ public List<InetAddressAndPort> getContactedReplicas()
+ {
+ return targetReplicas;
+ }
+
+ @Override
+ public void executeAsync()
+ {
+ makeDataRequests(targetReplicas.subList(0, targetReplicas.size() > 1 ? 2 : 1));
+ if (targetReplicas.size() > 2)
+ makeDigestRequests(targetReplicas.subList(2, targetReplicas.size()));
+ cfs.metric.speculativeRetries.inc();
+ }
+
+ @Override
+ void onReadTimeout()
+ {
+ cfs.metric.speculativeFailedRetries.inc();
+ }
+ }
+
+ public void setResult(PartitionIterator result)
+ {
+ Preconditions.checkState(this.result == null, "Result can only be set once");
+ this.result = result;
+ }
+
+ /**
+ * Wait for the CL to be satisfied by responses
+ */
+ public void awaitResponses() throws ReadTimeoutException
+ {
+ try
+ {
+ handler.awaitResults();
+ }
+ catch (ReadTimeoutException e)
+ {
+ try
+ {
+ onReadTimeout();
+ }
+ finally
+ {
+ throw e;
+ }
+ }
+
+ // return immediately, or begin a read repair
+ if (digestResolver.responsesMatch())
+ {
+ setResult(digestResolver.getData());
+ }
+ else
+ {
+ Tracing.trace("Digest mismatch: Mismatch for key {}", getKey());
+ readRepair.startForegroundRepair(digestResolver, handler.endpoints, getContactedReplicas(), this::setResult);
+ }
+ }
+
+ public void awaitReadRepair() throws ReadTimeoutException
+ {
+ try
+ {
+ readRepair.awaitForegroundRepairFinish();
+ }
+ catch (ReadTimeoutException e)
+ {
+ if (Tracing.isTracing())
+ Tracing.trace("Timed out waiting on digest mismatch repair requests");
+ else
+ logger.trace("Timed out waiting on digest mismatch repair requests");
+ // the caught exception here will have CL.ALL from the repair command,
+ // not whatever CL the initial command was at (CASSANDRA-7947)
+ int blockFor = consistency.blockFor(Keyspace.open(command.metadata().keyspace));
+ throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
+ }
+ }
+
+ public void maybeRepairAdditionalReplicas()
+ {
+ // TODO: this
+ }
+
+ public PartitionIterator getResult() throws ReadFailureException, ReadTimeoutException
+ {
+ Preconditions.checkState(result != null, "Result must be set first");
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/reads/AsyncRepairCallback.java
new file mode 100644
index 0000000..b7e7435
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/AsyncRepairCallback.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
+{
+ private final DataResolver repairResolver;
+ private final int blockfor;
+ protected final AtomicInteger received = new AtomicInteger(0);
+
+ public AsyncRepairCallback(DataResolver repairResolver, int blockfor)
+ {
+ this.repairResolver = repairResolver;
+ this.blockfor = blockfor;
+ }
+
+ public void response(MessageIn<ReadResponse> message)
+ {
+ repairResolver.preprocess(message);
+ if (received.incrementAndGet() == blockfor)
+ {
+ StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
+ {
+ protected void runMayThrow()
+ {
+ repairResolver.evaluateAllResponses();
+ }
+ });
+ }
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
new file mode 100644
index 0000000..11dd083
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.transform.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.tracing.TraceState;
+
+public class DataResolver extends ResponseResolver
+{
+ private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+ Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
+ @VisibleForTesting
+ final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+ private final long queryStartNanoTime;
+ private final boolean enforceStrictLiveness;
+
+ public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime, ReadRepair readRepair)
+ {
+ super(keyspace, command, consistency, readRepair, maxResponseCount);
+ this.queryStartNanoTime = queryStartNanoTime;
+ this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
+ }
+
+ public PartitionIterator getData()
+ {
+ ReadResponse response = responses.iterator().next().payload;
+ return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
+ }
+
+ public boolean isDataPresent()
+ {
+ return !responses.isEmpty();
+ }
+
+ public PartitionIterator resolve()
+ {
+ // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
+ // at the beginning of this method), so grab the response count once and use that through the method.
+ int count = responses.size();
+ List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
+ InetAddressAndPort[] sources = new InetAddressAndPort[count];
+ for (int i = 0; i < count; i++)
+ {
+ MessageIn<ReadResponse> msg = responses.get(i);
+ iters.add(msg.payload.makeIterator(command));
+ sources[i] = msg.from;
+ }
+
+ /*
+ * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+ * have more rows than the client requested. To make sure that we still conform to the original limit,
+ * we apply a top-level post-reconciliation counter to the merged partition iterator.
+ *
+ * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied
+ * to the current partition to work. For this reason we have to apply the counter transformation before
+ * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+ *
+ * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+ *
+ * See CASSANDRA-13747 for more details.
+ */
+
+ DataLimits.Counter mergedResultCounter =
+ command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
+
+ UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter);
+ FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
+ PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
+ return Transformation.apply(counted, new EmptyPartitionsDiscarder());
+ }
+
+ private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
+ InetAddressAndPort[] sources,
+ DataLimits.Counter mergedResultCounter)
+ {
+ // If we have only one results, there is no read repair to do and we can't get short reads
+ if (results.size() == 1)
+ return results.get(0);
+
+ /*
+ * So-called short reads stems from nodes returning only a subset of the results they have due to the limit,
+ * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother.
+ */
+ if (!command.limits().isUnlimited())
+ for (int i = 0; i < results.size(); i++)
+ {
+ results.set(i, ShortReadProtection.extend(sources[i], results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
+ }
+
+ return UnfilteredPartitionIterators.merge(results, command.nowInSec(), readRepair.getMergeListener(sources));
+ }
+
+ public void evaluateAllResponses()
+ {
+ // We need to fully consume the results to trigger read repairs if appropriate
+ try (PartitionIterator iterator = resolve())
+ {
+ PartitionIterators.consume(iterator);
+ }
+ }
+
+ public void evaluateAllResponses(TraceState traceState)
+ {
+ evaluateAllResponses();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
new file mode 100644
index 0000000..828a65e
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.tracing.TraceState;
+
+public class DigestResolver extends ResponseResolver
+{
+ private volatile ReadResponse dataResponse;
+
+ public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair, int maxResponseCount)
+ {
+ super(keyspace, command, consistency, readRepair, maxResponseCount);
+ Preconditions.checkArgument(command instanceof SinglePartitionReadCommand,
+ "DigestResolver can only be used with SinglePartitionReadCommand commands");
+ }
+
+ @Override
+ public void preprocess(MessageIn<ReadResponse> message)
+ {
+ super.preprocess(message);
+ if (dataResponse == null && !message.payload.isDigestResponse())
+ dataResponse = message.payload;
+ }
+
+ public PartitionIterator getData()
+ {
+ assert isDataPresent();
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+ }
+
+ public boolean responsesMatch()
+ {
+ long start = System.nanoTime();
+
+ // validate digests against each other; return false immediately on mismatch.
+ ByteBuffer digest = null;
+ for (MessageIn<ReadResponse> message : responses)
+ {
+ ReadResponse response = message.payload;
+
+ ByteBuffer newDigest = response.digest(command);
+ if (digest == null)
+ digest = newDigest;
+ else if (!digest.equals(newDigest))
+ // rely on the fact that only single partition queries use digests
+ return false;
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace("responsesMatch: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+
+ return true;
+ }
+
+ public void evaluateAllResponses(TraceState traceState)
+ {
+ if (!responsesMatch())
+ {
+ readRepair.backgroundDigestRepair(traceState);
+ }
+ }
+
+ public boolean isDataPresent()
+ {
+ return dataResponse != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
new file mode 100644
index 0000000..62fdfaa
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
+{
+ protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
+
+ public final ResponseResolver resolver;
+ final SimpleCondition condition = new SimpleCondition();
+ private final long queryStartNanoTime;
+ final int blockfor;
+ final List<InetAddressAndPort> endpoints;
+ private final ReadCommand command;
+ private final ConsistencyLevel consistencyLevel;
+ private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
+ = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
+ private volatile int received = 0;
+ private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater
+ = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
+ private volatile int failures = 0;
+ private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
+
+ private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
+
+ private final ReadRepair readRepair;
+
+ /**
+ * Constructor when response count has to be calculated and blocked for.
+ */
+ public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddressAndPort> filteredEndpoints, long queryStartNanoTime, ReadRepair readRepair)
+ {
+ this(resolver,
+ consistencyLevel,
+ consistencyLevel.blockFor(Keyspace.open(command.metadata().keyspace)),
+ command,
+ Keyspace.open(command.metadata().keyspace),
+ filteredEndpoints,
+ queryStartNanoTime, readRepair);
+ }
+
+ public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddressAndPort> endpoints, long queryStartNanoTime, ReadRepair readRepair)
+ {
+ this.command = command;
+ this.keyspace = keyspace;
+ this.blockfor = blockfor;
+ this.consistencyLevel = consistencyLevel;
+ this.resolver = resolver;
+ this.queryStartNanoTime = queryStartNanoTime;
+ this.endpoints = endpoints;
+ this.readRepair = readRepair;
+ this.failureReasonByEndpoint = new ConcurrentHashMap<>();
+ // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
+ assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size();
+
+ if (logger.isTraceEnabled())
+ logger.trace("Blockfor is {}; setting up requests to {}", blockfor, StringUtils.join(this.endpoints, ","));
+ }
+
+ public boolean await(long timePastStart, TimeUnit unit)
+ {
+ long time = unit.toNanos(timePastStart) - (System.nanoTime() - queryStartNanoTime);
+ try
+ {
+ return condition.await(time, TimeUnit.NANOSECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError(ex);
+ }
+ }
+
+ public void awaitResults() throws ReadFailureException, ReadTimeoutException
+ {
+ boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
+ boolean failed = blockfor + failures > endpoints.size();
+ if (signaled && !failed)
+ return;
+
+ if (Tracing.isTracing())
+ {
+ String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
+ Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
+ }
+ else if (logger.isDebugEnabled())
+ {
+ String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
+ logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
+ }
+
+ // Same as for writes, see AbstractWriteResponseHandler
+ throw failed
+ ? new ReadFailureException(consistencyLevel, received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
+ : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
+ }
+
+ public int blockFor()
+ {
+ return blockfor;
+ }
+
+ public void response(MessageIn<ReadResponse> message)
+ {
+ resolver.preprocess(message);
+ int n = waitingFor(message.from)
+ ? recievedUpdater.incrementAndGet(this)
+ : received;
+ if (n >= blockfor && resolver.isDataPresent())
+ {
+ condition.signalAll();
+ // kick off a background digest comparison if this is a result that (may have) arrived after
+ // the original resolve that get() kicks off as soon as the condition is signaled
+ if (blockfor < endpoints.size() && n == endpoints.size())
+ {
+ readRepair.maybeStartBackgroundRepair(resolver);
+ }
+ }
+ }
+
+ /**
+ * @return true if the message counts towards the blockfor threshold
+ */
+ private boolean waitingFor(InetAddressAndPort from)
+ {
+ return consistencyLevel.isDatacenterLocal()
+ ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
+ : true;
+ }
+
+ /**
+ * @return the current number of received responses
+ */
+ public int getReceivedCount()
+ {
+ return received;
+ }
+
+ public void response(ReadResponse result)
+ {
+ MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddressAndPort(),
+ result,
+ Collections.emptyMap(),
+ MessagingService.Verb.INTERNAL_RESPONSE,
+ MessagingService.current_version);
+ response(message);
+ }
+
+ public void assureSufficientLiveNodes() throws UnavailableException
+ {
+ consistencyLevel.assureSufficientLiveNodes(keyspace, endpoints);
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return true;
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
+ {
+ int n = waitingFor(from)
+ ? failuresUpdater.incrementAndGet(this)
+ : failures;
+
+ failureReasonByEndpoint.put(from, failureReason);
+
+ if (blockfor + n > endpoints.size())
+ condition.signalAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/ReadRepairDecision.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ReadRepairDecision.java b/src/java/org/apache/cassandra/service/reads/ReadRepairDecision.java
new file mode 100644
index 0000000..f434c88
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/ReadRepairDecision.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads;
+
+public enum ReadRepairDecision
+{
+ NONE, GLOBAL, DC_LOCAL;
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
new file mode 100644
index 0000000..69ec063
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.utils.concurrent.Accumulator;
+
+public abstract class ResponseResolver
+{
+ protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class);
+
+ protected final Keyspace keyspace;
+ protected final ReadCommand command;
+ protected final ConsistencyLevel consistency;
+ protected final ReadRepair readRepair;
+
+ // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
+ protected final Accumulator<MessageIn<ReadResponse>> responses;
+
+ public ResponseResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair, int maxResponseCount)
+ {
+ this.keyspace = keyspace;
+ this.command = command;
+ this.consistency = consistency;
+ this.readRepair = readRepair;
+ this.responses = new Accumulator<>(maxResponseCount);
+ }
+
+ /**
+ * Consume the accumulated responses, starting a read repair if neccesary
+ */
+ public abstract void evaluateAllResponses(TraceState traceState);
+
+ public abstract boolean isDataPresent();
+
+ public void preprocess(MessageIn<ReadResponse> message)
+ {
+ responses.add(message);
+ }
+
+ public Accumulator<MessageIn<ReadResponse>> getMessages()
+ {
+ return responses;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
new file mode 100644
index 0000000..fb5f48e
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.util.Collections;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.transform.MorePartitions;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.ExcludingBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.reads.repair.NoopReadRepair;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.tracing.Tracing;
+
+public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator>
+{
+ private final ReadCommand command;
+ private final InetAddressAndPort source;
+
+ private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
+ private final DataLimits.Counter mergedResultCounter; // merged end-result counter
+
+ private DecoratedKey lastPartitionKey; // key of the last observed partition
+
+ private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call
+
+ private final long queryStartNanoTime;
+
+ public ShortReadPartitionsProtection(ReadCommand command, InetAddressAndPort source,
+ DataLimits.Counter singleResultCounter,
+ DataLimits.Counter mergedResultCounter,
+ long queryStartNanoTime)
+ {
+ this.command = command;
+ this.source = source;
+ this.singleResultCounter = singleResultCounter;
+ this.mergedResultCounter = mergedResultCounter;
+ this.queryStartNanoTime = queryStartNanoTime;
+ }
+
+ @Override
+ public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ partitionsFetched = true;
+
+ lastPartitionKey = partition.partitionKey();
+
+ /*
+ * Extend for moreContents() then apply protection to track lastClustering by applyToRow().
+ *
+ * If we don't apply the transformation *after* extending the partition with MoreRows,
+ * applyToRow() method of protection will not be called on the first row of the new extension iterator.
+ */
+ ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(),
+ command, source,
+ this::executeReadCommand,
+ singleResultCounter,
+ mergedResultCounter);
+ return Transformation.apply(MoreRows.extend(partition, protection), protection);
+ }
+
+ /*
+ * We only get here once all the rows and partitions in this iterator have been iterated over, and so
+ * if the node had returned the requested number of rows but we still get here, then some results were
+ * skipped during reconciliation.
+ */
+ public UnfilteredPartitionIterator moreContents()
+ {
+ // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit
+ assert !mergedResultCounter.isDone();
+
+ // we do not apply short read protection when we have no limits at all
+ assert !command.limits().isUnlimited();
+
+ /*
+ * If this is a single partition read command or an (indexed) partition range read command with
+ * a partition key specified, then we can't and shouldn't try fetch more partitions.
+ */
+ assert !command.isLimitedToOnePartition();
+
+ /*
+ * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more.
+ *
+ * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
+ * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
+ */
+ if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
+ return null;
+
+ /*
+ * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator.
+ * There is no point to ask the replica for more rows - it has no more in the requested range.
+ */
+ if (!partitionsFetched)
+ return null;
+ partitionsFetched = false;
+
+ /*
+ * We are going to fetch one partition at a time for thrift and potentially more for CQL.
+ * The row limit will either be set to the per partition limit - if the command has no total row limit set, or
+ * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions,
+ * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones.
+ */
+ int toQuery = command.limits().count() != DataLimits.NO_LIMIT
+ ? command.limits().count() - counted(mergedResultCounter)
+ : command.limits().perPartitionCount();
+
+ ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
+ Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+
+ PartitionRangeReadCommand cmd = makeFetchAdditionalPartitionReadCommand(toQuery);
+ return executeReadCommand(cmd);
+ }
+
+ // Counts the number of rows for regular queries and the number of groups for GROUP BY queries
+ private int counted(DataLimits.Counter counter)
+ {
+ return command.limits().isGroupByLimit()
+ ? counter.rowCounted()
+ : counter.counted();
+ }
+
+ private PartitionRangeReadCommand makeFetchAdditionalPartitionReadCommand(int toQuery)
+ {
+ PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
+
+ DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
+
+ AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
+ AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
+ ? new Range<>(lastPartitionKey, bounds.right)
+ : new ExcludingBounds<>(lastPartitionKey, bounds.right);
+ DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
+
+ return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
+ }
+
+ private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd)
+ {
+ Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+ DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1, queryStartNanoTime, NoopReadRepair.instance);
+ ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source), queryStartNanoTime, NoopReadRepair.instance);
+
+ if (StorageProxy.canDoLocalRequest(source))
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
+ else
+ MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source, handler);
+
+ // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
+ handler.awaitResults();
+ assert resolver.getMessages().size() == 1;
+ return resolver.getMessages().get(0).payload.makeIterator(command);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
new file mode 100644
index 0000000..93c02e0
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.transform.MorePartitions;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+/**
+ * We have a potential short read if the result from a given node contains the requested number of rows
+ * (i.e. it has stopped returning results due to the limit), but some of them haven't
+ * made it into the final post-reconciliation result due to other nodes' row, range, and/or partition tombstones.
+ *
+ * If that is the case, then that node may have more rows that we should fetch, as otherwise we could
+ * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which
+ * which we also need to fetch as they may shadow rows or partitions from other replicas' results, which we would
+ * otherwise return incorrectly.
+ */
+public class ShortReadProtection
+{
+ public static UnfilteredPartitionIterator extend(InetAddressAndPort source, UnfilteredPartitionIterator partitions,
+ ReadCommand command, DataLimits.Counter mergedResultCounter,
+ long queryStartNanoTime, boolean enforceStrictLiveness)
+ {
+ DataLimits.Counter singleResultCounter = command.limits().newCounter(command.nowInSec(),
+ false,
+ command.selectsFullPartition(),
+ enforceStrictLiveness).onlyCount();
+
+ ShortReadPartitionsProtection protection = new ShortReadPartitionsProtection(command, source, singleResultCounter, mergedResultCounter, queryStartNanoTime);
+
+ /*
+ * The order of extention and transformations is important here. Extending with more partitions has to happen
+ * first due to the way BaseIterator.hasMoreContents() works: only transformations applied after extension will
+ * be called on the first partition of the extended iterator.
+ *
+ * Additionally, we want singleResultCounter to be applied after SRPP, so that its applyToPartition() method will
+ * be called last, after the extension done by SRRP.applyToPartition() call. That way we preserve the same order
+ * when it comes to calling SRRP.moreContents() and applyToRow() callbacks.
+ *
+ * See ShortReadPartitionsProtection.applyToPartition() for more details.
+ */
+
+ // extend with moreContents() only if it's a range read command with no partition key specified
+ if (!command.isLimitedToOnePartition())
+ partitions = MorePartitions.extend(partitions, protection); // register SRPP.moreContents()
+
+ partitions = Transformation.apply(partitions, protection); // register SRPP.applyToPartition()
+ partitions = Transformation.apply(partitions, singleResultCounter); // register the per-source counter
+
+ return partitions;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/5] cassandra git commit: Refactor read executor and response
resolver, abstract read repair
Posted by bd...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
deleted file mode 100644
index f406582..0000000
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ /dev/null
@@ -1,1122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
-import org.junit.*;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.ByteType;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static org.apache.cassandra.Util.assertClustering;
-import static org.apache.cassandra.Util.assertColumn;
-import static org.apache.cassandra.Util.assertColumns;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.apache.cassandra.db.ClusteringBound.Kind;
-
-public class DataResolverTest
-{
- public static final String KEYSPACE1 = "DataResolverTest";
- public static final String CF_STANDARD = "Standard1";
- public static final String CF_COLLECTION = "Collection1";
-
- // counter to generate the last byte of the respondent's address in a ReadResponse message
- private int addressSuffix = 10;
-
- private DecoratedKey dk;
- private Keyspace ks;
- private ColumnFamilyStore cfs;
- private ColumnFamilyStore cfs2;
- private TableMetadata cfm;
- private TableMetadata cfm2;
- private ColumnMetadata m;
- private int nowInSec;
- private ReadCommand command;
- private MessageRecorder messageRecorder;
-
-
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- DatabaseDescriptor.daemonInitialization();
-
- TableMetadata.Builder builder1 =
- TableMetadata.builder(KEYSPACE1, CF_STANDARD)
- .addPartitionKeyColumn("key", BytesType.instance)
- .addClusteringColumn("col1", AsciiType.instance)
- .addRegularColumn("c1", AsciiType.instance)
- .addRegularColumn("c2", AsciiType.instance)
- .addRegularColumn("one", AsciiType.instance)
- .addRegularColumn("two", AsciiType.instance);
-
- TableMetadata.Builder builder2 =
- TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
- .addPartitionKeyColumn("k", ByteType.instance)
- .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
-
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2);
- }
-
- @Before
- public void setup()
- {
- dk = Util.dk("key1");
- ks = Keyspace.open(KEYSPACE1);
- cfs = ks.getColumnFamilyStore(CF_STANDARD);
- cfm = cfs.metadata();
- cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
- cfm2 = cfs2.metadata();
- m = cfm2.getColumn(new ColumnIdentifier("m", false));
-
- nowInSec = FBUtilities.nowInSeconds();
- command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
- }
-
- @Before
- public void injectMessageSink()
- {
- // install an IMessageSink to capture all messages
- // so we can inspect them during tests
- messageRecorder = new MessageRecorder();
- MessagingService.instance().addMessageSink(messageRecorder);
- }
-
- @After
- public void removeMessageSink()
- {
- // should be unnecessary, but good housekeeping
- MessagingService.instance().clearMessageSinks();
- }
-
- /**
- * Checks that the provided data resolver has the expected number of repair futures created.
- * This method also "release" those future by faking replica responses to those repair, which is necessary or
- * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures.
- */
- private void assertRepairFuture(DataResolver resolver, int expectedRepairs)
- {
- assertEquals(expectedRepairs, resolver.repairResults.size());
-
- // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want
- // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload).
- for (AsyncOneResponse<?> future : resolver.repairResults)
- future.response(MessageIn.create(null, null, null, null, -1));
- }
-
- @Test
- public void testResolveNewerSingleRow() throws UnknownHostException
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
- .add("c1", "v1")
- .buildUpdate())));
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
- .add("c1", "v2")
- .buildUpdate())));
-
- try(PartitionIterator data = resolver.resolve())
- {
- try (RowIterator rows = Iterators.getOnlyElement(data))
- {
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c1");
- assertColumn(cfm, row, "c1", "v2", 1);
- }
- assertRepairFuture(resolver, 1);
- }
-
- assertEquals(1, messageRecorder.sent.size());
- // peer 1 just needs to repair with the row from peer 2
- MessageOut msg = getSentMessage(peer1);
- assertRepairMetadata(msg);
- assertRepairContainsNoDeletions(msg);
- assertRepairContainsColumn(msg, "1", "c1", "v2", 1);
- }
-
- @Test
- public void testResolveDisjointSingleRow()
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
- .add("c1", "v1")
- .buildUpdate())));
-
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
- .add("c2", "v2")
- .buildUpdate())));
-
- try(PartitionIterator data = resolver.resolve())
- {
- try (RowIterator rows = Iterators.getOnlyElement(data))
- {
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c1", "c2");
- assertColumn(cfm, row, "c1", "v1", 0);
- assertColumn(cfm, row, "c2", "v2", 1);
- }
- assertRepairFuture(resolver, 2);
- }
-
- assertEquals(2, messageRecorder.sent.size());
- // each peer needs to repair with each other's column
- MessageOut msg = getSentMessage(peer1);
- assertRepairMetadata(msg);
- assertRepairContainsColumn(msg, "1", "c2", "v2", 1);
-
- msg = getSentMessage(peer2);
- assertRepairMetadata(msg);
- assertRepairContainsColumn(msg, "1", "c1", "v1", 0);
- }
-
- @Test
- public void testResolveDisjointMultipleRows() throws UnknownHostException
- {
-
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
- .add("c1", "v1")
- .buildUpdate())));
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2")
- .add("c2", "v2")
- .buildUpdate())));
-
- try (PartitionIterator data = resolver.resolve())
- {
- try (RowIterator rows = data.next())
- {
- // We expect the resolved superset to contain both rows
- Row row = rows.next();
- assertClustering(cfm, row, "1");
- assertColumns(row, "c1");
- assertColumn(cfm, row, "c1", "v1", 0);
-
- row = rows.next();
- assertClustering(cfm, row, "2");
- assertColumns(row, "c2");
- assertColumn(cfm, row, "c2", "v2", 1);
-
- assertFalse(rows.hasNext());
- assertFalse(data.hasNext());
- }
- assertRepairFuture(resolver, 2);
- }
-
- assertEquals(2, messageRecorder.sent.size());
- // each peer needs to repair the row from the other
- MessageOut msg = getSentMessage(peer1);
- assertRepairMetadata(msg);
- assertRepairContainsNoDeletions(msg);
- assertRepairContainsColumn(msg, "2", "c2", "v2", 1);
-
- msg = getSentMessage(peer2);
- assertRepairMetadata(msg);
- assertRepairContainsNoDeletions(msg);
- assertRepairContainsColumn(msg, "1", "c1", "v1", 0);
- }
-
- @Test
- public void testResolveDisjointMultipleRowsWithRangeTombstones()
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime());
-
- RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
- RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
- PartitionUpdate update = new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
- .addRangeTombstone(tombstone2)
- .buildUpdate();
-
- InetAddressAndPort peer1 = peer();
- UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
- .addRangeTombstone(tombstone2)
- .buildUpdate());
- resolver.preprocess(readResponseMessage(peer1, iter1));
- // not covered by any range tombstone
- InetAddressAndPort peer2 = peer();
- UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("0")
- .add("c1", "v0")
- .buildUpdate());
- resolver.preprocess(readResponseMessage(peer2, iter2));
- // covered by a range tombstone
- InetAddressAndPort peer3 = peer();
- UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("10")
- .add("c2", "v1")
- .buildUpdate());
- resolver.preprocess(readResponseMessage(peer3, iter3));
- // range covered by rt, but newer
- InetAddressAndPort peer4 = peer();
- UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm, nowInSec, 2L, dk).clustering("3")
- .add("one", "A")
- .buildUpdate());
- resolver.preprocess(readResponseMessage(peer4, iter4));
- try (PartitionIterator data = resolver.resolve())
- {
- try (RowIterator rows = data.next())
- {
- Row row = rows.next();
- assertClustering(cfm, row, "0");
- assertColumns(row, "c1");
- assertColumn(cfm, row, "c1", "v0", 0);
-
- row = rows.next();
- assertClustering(cfm, row, "3");
- assertColumns(row, "one");
- assertColumn(cfm, row, "one", "A", 2);
-
- assertFalse(rows.hasNext());
- }
- assertRepairFuture(resolver, 4);
- }
-
- assertEquals(4, messageRecorder.sent.size());
- // peer1 needs the rows from peers 2 and 4
- MessageOut msg = getSentMessage(peer1);
- assertRepairMetadata(msg);
- assertRepairContainsNoDeletions(msg);
- assertRepairContainsColumn(msg, "0", "c1", "v0", 0);
- assertRepairContainsColumn(msg, "3", "one", "A", 2);
-
- // peer2 needs to get the row from peer4 and the RTs
- msg = getSentMessage(peer2);
- assertRepairMetadata(msg);
- assertRepairContainsDeletions(msg, null, tombstone1, tombstone2);
- assertRepairContainsColumn(msg, "3", "one", "A", 2);
-
- // peer 3 needs both rows and the RTs
- msg = getSentMessage(peer3);
- assertRepairMetadata(msg);
- assertRepairContainsDeletions(msg, null, tombstone1, tombstone2);
- assertRepairContainsColumn(msg, "0", "c1", "v0", 0);
- assertRepairContainsColumn(msg, "3", "one", "A", 2);
-
- // peer4 needs the row from peer2 and the RTs
- msg = getSentMessage(peer4);
- assertRepairMetadata(msg);
- assertRepairContainsDeletions(msg, null, tombstone1, tombstone2);
- assertRepairContainsColumn(msg, "0", "c1", "v0", 0);
- }
-
- @Test
- public void testResolveWithOneEmpty()
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
- .add("c2", "v2")
- .buildUpdate())));
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm)));
-
- try(PartitionIterator data = resolver.resolve())
- {
- try (RowIterator rows = Iterators.getOnlyElement(data))
- {
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "c2");
- assertColumn(cfm, row, "c2", "v2", 1);
- }
- assertRepairFuture(resolver, 1);
- }
-
- assertEquals(1, messageRecorder.sent.size());
- // peer 2 needs the row from peer 1
- MessageOut msg = getSentMessage(peer2);
- assertRepairMetadata(msg);
- assertRepairContainsNoDeletions(msg);
- assertRepairContainsColumn(msg, "1", "c2", "v2", 1);
- }
-
- @Test
- public void testResolveWithBothEmpty()
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
- resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm)));
- resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm)));
-
- try(PartitionIterator data = resolver.resolve())
- {
- assertFalse(data.hasNext());
- assertRepairFuture(resolver, 0);
- }
-
- assertTrue(messageRecorder.sent.isEmpty());
- }
-
- @Test
- public void testResolveDeleted()
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
- // one response with columns timestamped before a delete in another response
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
- .add("one", "A")
- .buildUpdate())));
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, fullPartitionDelete(cfm, dk, 1, nowInSec)));
-
- try (PartitionIterator data = resolver.resolve())
- {
- assertFalse(data.hasNext());
- assertRepairFuture(resolver, 1);
- }
-
- // peer1 should get the deletion from peer2
- assertEquals(1, messageRecorder.sent.size());
- MessageOut msg = getSentMessage(peer1);
- assertRepairMetadata(msg);
- assertRepairContainsDeletions(msg, new DeletionTime(1, nowInSec));
- assertRepairContainsNoColumns(msg);
- }
-
- @Test
- public void testResolveMultipleDeleted()
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime());
- // deletes and columns with interleaved timestamp, with out of order return sequence
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
- // these columns created after the previous deletion
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
- .add("one", "A")
- .add("two", "A")
- .buildUpdate())));
- //this column created after the next delete
- InetAddressAndPort peer3 = peer();
- resolver.preprocess(readResponseMessage(peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1")
- .add("two", "B")
- .buildUpdate())));
- InetAddressAndPort peer4 = peer();
- resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
-
- try(PartitionIterator data = resolver.resolve())
- {
- try (RowIterator rows = Iterators.getOnlyElement(data))
- {
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "two");
- assertColumn(cfm, row, "two", "B", 3);
- }
- assertRepairFuture(resolver, 4);
- }
-
- // peer 1 needs to get the partition delete from peer 4 and the row from peer 3
- assertEquals(4, messageRecorder.sent.size());
- MessageOut msg = getSentMessage(peer1);
- assertRepairMetadata(msg);
- assertRepairContainsDeletions(msg, new DeletionTime(2, nowInSec));
- assertRepairContainsColumn(msg, "1", "two", "B", 3);
-
- // peer 2 needs the deletion from peer 4 and the row from peer 3
- msg = getSentMessage(peer2);
- assertRepairMetadata(msg);
- assertRepairContainsDeletions(msg, new DeletionTime(2, nowInSec));
- assertRepairContainsColumn(msg, "1", "two", "B", 3);
-
- // peer 3 needs just the deletion from peer 4
- msg = getSentMessage(peer3);
- assertRepairMetadata(msg);
- assertRepairContainsDeletions(msg, new DeletionTime(2, nowInSec));
- assertRepairContainsNoColumns(msg);
-
- // peer 4 needs just the row from peer 3
- msg = getSentMessage(peer4);
- assertRepairMetadata(msg);
- assertRepairContainsNoDeletions(msg);
- assertRepairContainsColumn(msg, "1", "two", "B", 3);
- }
-
- @Test
- public void testResolveRangeTombstonesOnBoundaryRightWins() throws UnknownHostException
- {
- resolveRangeTombstonesOnBoundary(1, 2);
- }
-
- @Test
- public void testResolveRangeTombstonesOnBoundaryLeftWins() throws UnknownHostException
- {
- resolveRangeTombstonesOnBoundary(2, 1);
- }
-
- @Test
- public void testResolveRangeTombstonesOnBoundarySameTimestamp() throws UnknownHostException
- {
- resolveRangeTombstonesOnBoundary(1, 1);
- }
-
- /*
- * We want responses to merge on tombstone boundary. So we'll merge 2 "streams":
- * 1: [1, 2)(3, 4](5, 6] 2
- * 2: [2, 3][4, 5) 1
- * which tests all combination of open/close boundaries (open/close, close/open, open/open, close/close).
- *
- * Note that, because DataResolver returns a "filtered" iterator, it should resolve into an empty iterator.
- * However, what should be sent to each source depends on the exact on the timestamps of each tombstones and we
- * test a few combination.
- */
- private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
- InetAddressAndPort peer1 = peer();
- InetAddressAndPort peer2 = peer();
-
- // 1st "stream"
- RangeTombstone one_two = tombstone("1", true , "2", false, timestamp1, nowInSec);
- RangeTombstone three_four = tombstone("3", false, "4", true , timestamp1, nowInSec);
- RangeTombstone five_six = tombstone("5", false, "6", true , timestamp1, nowInSec);
- UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(one_two)
- .addRangeTombstone(three_four)
- .addRangeTombstone(five_six)
- .buildUpdate());
-
- // 2nd "stream"
- RangeTombstone two_three = tombstone("2", true, "3", true , timestamp2, nowInSec);
- RangeTombstone four_five = tombstone("4", true, "5", false, timestamp2, nowInSec);
- UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(two_three)
- .addRangeTombstone(four_five)
- .buildUpdate());
-
- resolver.preprocess(readResponseMessage(peer1, iter1));
- resolver.preprocess(readResponseMessage(peer2, iter2));
-
- // No results, we've only reconciled tombstones.
- try (PartitionIterator data = resolver.resolve())
- {
- assertFalse(data.hasNext());
- assertRepairFuture(resolver, 2);
- }
-
- assertEquals(2, messageRecorder.sent.size());
-
- MessageOut msg1 = getSentMessage(peer1);
- assertRepairMetadata(msg1);
- assertRepairContainsNoColumns(msg1);
-
- MessageOut msg2 = getSentMessage(peer2);
- assertRepairMetadata(msg2);
- assertRepairContainsNoColumns(msg2);
-
- // Both streams are mostly complementary, so they will roughly get the ranges of the other stream. One subtlety is
- // around the value "4" however, as it's included by both stream.
- // So for a given stream, unless the other stream has a strictly higher timestamp, the value 4 will be excluded
- // from whatever range it receives as repair since the stream already covers it.
-
- // Message to peer1 contains peer2 ranges
- assertRepairContainsDeletions(msg1, null, two_three, withExclusiveStartIf(four_five, timestamp1 >= timestamp2));
-
- // Message to peer2 contains peer1 ranges
- assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
- }
-
- /**
- * Test cases where a boundary of a source is covered by another source deletion and timestamp on one or both side
- * of the boundary are equal to the "merged" deletion.
- * This is a test for CASSANDRA-13237 to make sure we handle this case properly.
- */
- @Test
- public void testRepairRangeTombstoneBoundary() throws UnknownHostException
- {
- testRepairRangeTombstoneBoundary(1, 0, 1);
- messageRecorder.sent.clear();
- testRepairRangeTombstoneBoundary(1, 1, 0);
- messageRecorder.sent.clear();
- testRepairRangeTombstoneBoundary(1, 1, 1);
- }
-
- /**
- * Test for CASSANDRA-13237, checking we don't fail (and handle correctly) the case where a RT boundary has the
- * same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could
- * thus still be sent).
- */
- private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
- InetAddressAndPort peer1 = peer();
- InetAddressAndPort peer2 = peer();
-
- // 1st "stream"
- RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec);
- UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
- .addRangeTombstone(one_nine)
- .buildUpdate());
-
- // 2nd "stream" (build more manually to ensure we have the boundary we want)
- RangeTombstoneBoundMarker open_one = marker("0", true, true, timestamp2, nowInSec);
- RangeTombstoneBoundaryMarker boundary_five = boundary("5", false, timestamp2, nowInSec, timestamp3, nowInSec);
- RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec);
- UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine);
-
- resolver.preprocess(readResponseMessage(peer1, iter1));
- resolver.preprocess(readResponseMessage(peer2, iter2));
-
- boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3;
-
- // No results, we've only reconciled tombstones.
- try (PartitionIterator data = resolver.resolve())
- {
- assertFalse(data.hasNext());
- assertRepairFuture(resolver, shouldHaveRepair ? 1 : 0);
- }
-
- assertEquals(shouldHaveRepair? 1 : 0, messageRecorder.sent.size());
-
- if (!shouldHaveRepair)
- return;
-
- MessageOut msg = getSentMessage(peer2);
- assertRepairMetadata(msg);
- assertRepairContainsNoColumns(msg);
-
- RangeTombstone expected = timestamp1 != timestamp2
- // We've repaired the 1st part
- ? tombstone("0", true, "5", false, timestamp1, nowInSec)
- // We've repaired the 2nd part
- : tombstone("5", true, "9", true, timestamp1, nowInSec);
- assertRepairContainsDeletions(msg, null, expected);
- }
-
- /**
- * Test for CASSANDRA-13719: tests that having a partition deletion shadow a range tombstone on another source
- * doesn't trigger an assertion error.
- */
- @Test
- public void testRepairRangeTombstoneWithPartitionDeletion()
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
- InetAddressAndPort peer1 = peer();
- InetAddressAndPort peer2 = peer();
-
- // 1st "stream": just a partition deletion
- UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec));
-
- // 2nd "stream": a range tombstone that is covered by the 1st stream
- RangeTombstone rt = tombstone("0", true , "10", true, 5, nowInSec);
- UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
- .addRangeTombstone(rt)
- .buildUpdate());
-
- resolver.preprocess(readResponseMessage(peer1, iter1));
- resolver.preprocess(readResponseMessage(peer2, iter2));
-
- // No results, we've only reconciled tombstones.
- try (PartitionIterator data = resolver.resolve())
- {
- assertFalse(data.hasNext());
- // 2nd stream should get repaired
- assertRepairFuture(resolver, 1);
- }
-
- assertEquals(1, messageRecorder.sent.size());
-
- MessageOut msg = getSentMessage(peer2);
- assertRepairMetadata(msg);
- assertRepairContainsNoColumns(msg);
-
- assertRepairContainsDeletions(msg, new DeletionTime(10, nowInSec));
- }
-
- /**
- * Additional test for CASSANDRA-13719: tests the case where a partition deletion doesn't shadow a range tombstone.
- */
- @Test
- public void testRepairRangeTombstoneWithPartitionDeletion2()
- {
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
- InetAddressAndPort peer1 = peer();
- InetAddressAndPort peer2 = peer();
-
- // 1st "stream": a partition deletion and a range tombstone
- RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec);
- PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
- .addRangeTombstone(rt1)
- .buildUpdate();
- ((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec));
- UnfilteredPartitionIterator iter1 = iter(upd1);
-
- // 2nd "stream": a range tombstone that is covered by the other stream rt
- RangeTombstone rt2 = tombstone("2", true , "3", true, 11, nowInSec);
- RangeTombstone rt3 = tombstone("4", true , "5", true, 10, nowInSec);
- UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
- .addRangeTombstone(rt2)
- .addRangeTombstone(rt3)
- .buildUpdate());
-
- resolver.preprocess(readResponseMessage(peer1, iter1));
- resolver.preprocess(readResponseMessage(peer2, iter2));
-
- // No results, we've only reconciled tombstones.
- try (PartitionIterator data = resolver.resolve())
- {
- assertFalse(data.hasNext());
- // 2nd stream should get repaired
- assertRepairFuture(resolver, 1);
- }
-
- assertEquals(1, messageRecorder.sent.size());
-
- MessageOut msg = getSentMessage(peer2);
- assertRepairMetadata(msg);
- assertRepairContainsNoColumns(msg);
-
- // 2nd stream should get both the partition deletion, as well as the part of the 1st stream RT that it misses
- assertRepairContainsDeletions(msg, new DeletionTime(10, nowInSec),
- tombstone("0", true, "2", false, 11, nowInSec),
- tombstone("3", false, "9", true, 11, nowInSec));
- }
-
- // Forces the start to be exclusive if the condition holds
- private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
- {
- if (!condition)
- return rt;
-
- Slice slice = rt.deletedSlice();
- ClusteringBound newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start().getRawValues());
- return condition
- ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
- : rt;
- }
-
- // Forces the end to be exclusive if the condition holds
- private static RangeTombstone withExclusiveEndIf(RangeTombstone rt, boolean condition)
- {
- if (!condition)
- return rt;
-
- Slice slice = rt.deletedSlice();
- ClusteringBound newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end().getRawValues());
- return condition
- ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
- : rt;
- }
-
- private static ByteBuffer bb(int b)
- {
- return ByteBufferUtil.bytes(b);
- }
-
- private Cell mapCell(int k, int v, long ts)
- {
- return BufferCell.live(m, ts, bb(v), CellPath.create(bb(k)));
- }
-
- @Test
- public void testResolveComplexDelete()
- {
- ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
- DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
-
- long[] ts = {100, 200};
-
- Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
- builder.newRow(Clustering.EMPTY);
- builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
- builder.addCell(mapCell(0, 0, ts[0]));
-
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
- builder.newRow(Clustering.EMPTY);
- DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
- builder.addComplexDeletion(m, expectedCmplxDelete);
- Cell expectedCell = mapCell(1, 1, ts[1]);
- builder.addCell(expectedCell);
-
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
- try(PartitionIterator data = resolver.resolve())
- {
- try (RowIterator rows = Iterators.getOnlyElement(data))
- {
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
- Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
- }
- assertRepairFuture(resolver, 1);
- }
-
- MessageOut<Mutation> msg;
- msg = getSentMessage(peer1);
- Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2).iterator();
- assertTrue(rowIter.hasNext());
- Row row = rowIter.next();
- assertFalse(rowIter.hasNext());
-
- ComplexColumnData cd = row.getComplexColumnData(m);
-
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
- assertEquals(expectedCmplxDelete, cd.complexDeletion());
-
- Assert.assertNull(messageRecorder.sent.get(peer2));
- }
-
- @Test
- public void testResolveDeletedCollection()
- {
-
- ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
- DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
-
- long[] ts = {100, 200};
-
- Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
- builder.newRow(Clustering.EMPTY);
- builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
- builder.addCell(mapCell(0, 0, ts[0]));
-
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
- builder.newRow(Clustering.EMPTY);
- DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
- builder.addComplexDeletion(m, expectedCmplxDelete);
-
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
- try(PartitionIterator data = resolver.resolve())
- {
- assertFalse(data.hasNext());
- assertRepairFuture(resolver, 1);
- }
-
- MessageOut<Mutation> msg;
- msg = getSentMessage(peer1);
- Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2).iterator();
- assertTrue(rowIter.hasNext());
- Row row = rowIter.next();
- assertFalse(rowIter.hasNext());
-
- ComplexColumnData cd = row.getComplexColumnData(m);
-
- assertEquals(Collections.emptySet(), Sets.newHashSet(cd));
- assertEquals(expectedCmplxDelete, cd.complexDeletion());
-
- Assert.assertNull(messageRecorder.sent.get(peer2));
- }
-
- @Test
- public void testResolveNewCollection()
- {
- ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
- DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
-
- long[] ts = {100, 200};
-
- // map column
- Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
- builder.newRow(Clustering.EMPTY);
- DeletionTime expectedCmplxDelete = new DeletionTime(ts[0] - 1, nowInSec);
- builder.addComplexDeletion(m, expectedCmplxDelete);
- Cell expectedCell = mapCell(0, 0, ts[0]);
- builder.addCell(expectedCell);
-
- // empty map column
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
-
- try(PartitionIterator data = resolver.resolve())
- {
- try (RowIterator rows = Iterators.getOnlyElement(data))
- {
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- ComplexColumnData cd = row.getComplexColumnData(m);
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
- }
- assertRepairFuture(resolver, 1);
- }
-
- Assert.assertNull(messageRecorder.sent.get(peer1));
-
- MessageOut<Mutation> msg;
- msg = getSentMessage(peer2);
- Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2).iterator();
- assertTrue(rowIter.hasNext());
- Row row = rowIter.next();
- assertFalse(rowIter.hasNext());
-
- ComplexColumnData cd = row.getComplexColumnData(m);
-
- assertEquals(Sets.newHashSet(expectedCell), Sets.newHashSet(cd));
- assertEquals(expectedCmplxDelete, cd.complexDeletion());
- }
-
- @Test
- public void testResolveNewCollectionOverwritingDeleted()
- {
- ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
- DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
-
- long[] ts = {100, 200};
-
- // cleared map column
- Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
- builder.newRow(Clustering.EMPTY);
- builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
-
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
- // newer, overwritten map column
- builder.newRow(Clustering.EMPTY);
- DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
- builder.addComplexDeletion(m, expectedCmplxDelete);
- Cell expectedCell = mapCell(1, 1, ts[1]);
- builder.addCell(expectedCell);
-
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
-
- try(PartitionIterator data = resolver.resolve())
- {
- try (RowIterator rows = Iterators.getOnlyElement(data))
- {
- Row row = Iterators.getOnlyElement(rows);
- assertColumns(row, "m");
- ComplexColumnData cd = row.getComplexColumnData(m);
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
- }
- assertRepairFuture(resolver, 1);
- }
-
- MessageOut<Mutation> msg;
- msg = getSentMessage(peer1);
- Row row = Iterators.getOnlyElement(msg.payload.getPartitionUpdate(cfm2).iterator());
-
- ComplexColumnData cd = row.getComplexColumnData(m);
-
- assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
- assertEquals(expectedCmplxDelete, cd.complexDeletion());
-
- Assert.assertNull(messageRecorder.sent.get(peer2));
- }
-
- private InetAddressAndPort peer()
- {
- try
- {
- return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- private MessageOut<Mutation> getSentMessage(InetAddressAndPort target)
- {
- MessageOut<Mutation> message = messageRecorder.sent.get(target);
- assertNotNull(String.format("No repair message was sent to %s", target), message);
- return message;
- }
-
- private void assertRepairContainsDeletions(MessageOut<Mutation> message,
- DeletionTime deletionTime,
- RangeTombstone...rangeTombstones)
- {
- PartitionUpdate update = ((Mutation)message.payload).getPartitionUpdates().iterator().next();
- DeletionInfo deletionInfo = update.deletionInfo();
- if (deletionTime != null)
- assertEquals(deletionTime, deletionInfo.getPartitionDeletion());
-
- assertEquals(rangeTombstones.length, deletionInfo.rangeCount());
- Iterator<RangeTombstone> ranges = deletionInfo.rangeIterator(false);
- int i = 0;
- while (ranges.hasNext())
- {
- RangeTombstone expected = rangeTombstones[i++];
- RangeTombstone actual = ranges.next();
- String msg = String.format("Expected %s, but got %s", expected.toString(cfm.comparator), actual.toString(cfm.comparator));
- assertEquals(msg, expected, actual);
- }
- }
-
- private void assertRepairContainsNoDeletions(MessageOut<Mutation> message)
- {
- PartitionUpdate update = ((Mutation)message.payload).getPartitionUpdates().iterator().next();
- assertTrue(update.deletionInfo().isLive());
- }
-
- private void assertRepairContainsColumn(MessageOut<Mutation> message,
- String clustering,
- String columnName,
- String value,
- long timestamp)
- {
- PartitionUpdate update = ((Mutation)message.payload).getPartitionUpdates().iterator().next();
- Row row = update.getRow(update.metadata().comparator.make(clustering));
- assertNotNull(row);
- assertColumn(cfm, row, columnName, value, timestamp);
- }
-
- private void assertRepairContainsNoColumns(MessageOut<Mutation> message)
- {
- PartitionUpdate update = ((Mutation)message.payload).getPartitionUpdates().iterator().next();
- assertFalse(update.iterator().hasNext());
- }
-
- private void assertRepairMetadata(MessageOut<Mutation> message)
- {
- assertEquals(MessagingService.Verb.READ_REPAIR, message.verb);
- PartitionUpdate update = ((Mutation)message.payload).getPartitionUpdates().iterator().next();
- assertEquals(update.metadata().keyspace, cfm.keyspace);
- assertEquals(update.metadata().name, cfm.name);
- }
-
-
- public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator)
- {
- return readResponseMessage(from, partitionIterator, command);
-
- }
- public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd)
- {
- return MessageIn.create(from,
- ReadResponse.createRemoteDataResponse(partitionIterator, cmd),
- Collections.EMPTY_MAP,
- MessagingService.Verb.REQUEST_RESPONSE,
- MessagingService.current_version);
- }
-
- private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
- {
- return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
- }
-
- private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
- {
- ClusteringBound startBound = rtBound(start, true, inclusiveStart);
- ClusteringBound endBound = rtBound(end, false, inclusiveEnd);
- return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
- }
-
- private ClusteringBound rtBound(Object value, boolean isStart, boolean inclusive)
- {
- ClusteringBound.Kind kind = isStart
- ? (inclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND)
- : (inclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND);
-
- return ClusteringBound.create(kind, cfm.comparator.make(value).getRawValues());
- }
-
- private ClusteringBoundary rtBoundary(Object value, boolean inclusiveOnEnd)
- {
- ClusteringBound.Kind kind = inclusiveOnEnd
- ? Kind.INCL_END_EXCL_START_BOUNDARY
- : Kind.EXCL_END_INCL_START_BOUNDARY;
- return ClusteringBoundary.create(kind, cfm.comparator.make(value).getRawValues());
- }
-
- private RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime)
- {
- return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime));
- }
-
- private RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2)
- {
- return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd),
- new DeletionTime(markedForDeleteAt1, localDeletionTime1),
- new DeletionTime(markedForDeleteAt2, localDeletionTime2));
- }
-
- private UnfilteredPartitionIterator fullPartitionDelete(TableMetadata table, DecoratedKey dk, long timestamp, int nowInSec)
- {
- return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(table, dk, timestamp, nowInSec).unfilteredIterator());
- }
-
- private static class MessageRecorder implements IMessageSink
- {
- Map<InetAddressAndPort, MessageOut> sent = new HashMap<>();
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
- {
- sent.put(to, message);
- return false;
- }
-
- public boolean allowIncomingMessage(MessageIn message, int id)
- {
- return false;
- }
- }
-
- private UnfilteredPartitionIterator iter(PartitionUpdate update)
- {
- return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator());
- }
-
- private UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds)
- {
- SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator);
- Collections.addAll(s, unfiltereds);
- final Iterator<Unfiltered> iterator = s.iterator();
-
- UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm,
- key,
- DeletionTime.LIVE,
- cfm.regularAndStaticColumns(),
- Rows.EMPTY_STATIC_ROW,
- false,
- EncodingStats.NO_STATS)
- {
- protected Unfiltered computeNext()
- {
- return iterator.hasNext() ? iterator.next() : endOfData();
- }
- };
- return new SingletonUnfilteredPartitionIterator(rowIter);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
deleted file mode 100644
index f21e241..0000000
--- a/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableList;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.exceptions.ReadFailureException;
-import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.KeyspaceParams;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ReadExecutorTest
-{
- static Keyspace ks;
- static ColumnFamilyStore cfs;
- static List<InetAddressAndPort> targets;
-
- @BeforeClass
- public static void setUpClass() throws Throwable
- {
- SchemaLoader.loadSchema();
- SchemaLoader.createKeyspace("Foo", KeyspaceParams.simple(3), SchemaLoader.standardCFMD("Foo", "Bar"));
- ks = Keyspace.open("Foo");
- cfs = ks.getColumnFamilyStore("Bar");
- targets = ImmutableList.of(InetAddressAndPort.getByName("127.0.0.255"), InetAddressAndPort.getByName("127.0.0.254"), InetAddressAndPort.getByName("127.0.0.253"));
- cfs.sampleLatencyNanos = 0;
- }
-
- @Before
- public void resetCounters() throws Throwable
- {
- cfs.metric.speculativeInsufficientReplicas.dec(cfs.metric.speculativeInsufficientReplicas.getCount());
- cfs.metric.speculativeRetries.dec(cfs.metric.speculativeRetries.getCount());
- cfs.metric.speculativeFailedRetries.dec(cfs.metric.speculativeFailedRetries.getCount());
- }
-
- /**
- * If speculation would have been beneficial but could not be attempted due to lack of replicas
- * count that it occured
- */
- @Test
- public void testUnableToSpeculate() throws Throwable
- {
- assertEquals(0, cfs.metric.speculativeInsufficientReplicas.getCount());
- assertEquals(0, ks.metric.speculativeInsufficientReplicas.getCount());
- AbstractReadExecutor executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), true);
- executor.maybeTryAdditionalReplicas();
- try
- {
- executor.get();
- fail();
- }
- catch (ReadTimeoutException e)
- {
- //expected
- }
- assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
- assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
-
- //Shouldn't increment
- executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), false);
- executor.maybeTryAdditionalReplicas();
- try
- {
- executor.get();
- fail();
- }
- catch (ReadTimeoutException e)
- {
- //expected
- }
- assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
- assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
- }
-
- /**
- * Test that speculation when it is attempted is countedc, and when it succeed
- * no failure is counted.
- */
- @Test
- public void testSpeculateSucceeded() throws Throwable
- {
- assertEquals(0, cfs.metric.speculativeRetries.getCount());
- assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
- assertEquals(0, ks.metric.speculativeRetries.getCount());
- assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
- AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
- executor.maybeTryAdditionalReplicas();
- new Thread()
- {
- @Override
- public void run()
- {
- //Failures end the read promptly but don't require mock data to be suppleid
- executor.handler.onFailure(targets.get(0), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
- executor.handler.onFailure(targets.get(1), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
- executor.handler.condition.signalAll();
- }
- }.start();
-
- try
- {
- executor.get();
- fail();
- }
- catch (ReadFailureException e)
- {
- //expected
- }
- assertEquals(1, cfs.metric.speculativeRetries.getCount());
- assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
- assertEquals(1, ks.metric.speculativeRetries.getCount());
- assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
-
- }
-
- /**
- * Test that speculation failure statistics are incremented if speculation occurs
- * and the read still times out.
- */
- @Test
- public void testSpeculateFailed() throws Throwable
- {
- assertEquals(0, cfs.metric.speculativeRetries.getCount());
- assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
- assertEquals(0, ks.metric.speculativeRetries.getCount());
- assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
- AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
- executor.maybeTryAdditionalReplicas();
- try
- {
- executor.get();
- fail();
- }
- catch (ReadTimeoutException e)
- {
- //expected
- }
- assertEquals(1, cfs.metric.speculativeRetries.getCount());
- assertEquals(1, cfs.metric.speculativeFailedRetries.getCount());
- assertEquals(1, ks.metric.speculativeRetries.getCount());
- assertEquals(1, ks.metric.speculativeFailedRetries.getCount());
- }
-
- public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand
- {
- private final long timeout;
-
- MockSinglePartitionReadCommand()
- {
- this(0);
- }
-
- MockSinglePartitionReadCommand(long timeout)
- {
- super(false, 0, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null);
- this.timeout = timeout;
- }
-
- @Override
- public long getTimeout()
- {
- return timeout;
- }
-
- @Override
- public MessageOut createMessage()
- {
- return new MessageOut(MessagingService.Verb.BATCH_REMOVE)
- {
- @Override
- public int serializedSize(int version)
- {
- return 0;
- }
- };
- }
-
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[5/5] cassandra git commit: Refactor read executor and response
resolver, abstract read repair
Posted by bd...@apache.org.
Refactor read executor and response resolver, abstract read repair
Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-14058
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39807ba4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39807ba4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39807ba4
Branch: refs/heads/trunk
Commit: 39807ba48ed2e02223014fbf47dce21d4124b380
Parents: 4a50f44
Author: Blake Eggleston <bd...@gmail.com>
Authored: Tue Nov 14 17:38:10 2017 -0800
Committer: Blake Eggleston <bd...@gmail.com>
Committed: Thu Mar 1 17:52:54 2018 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ConsistencyLevel.java | 2 +-
.../UnfilteredPartitionIterators.java | 10 +
.../db/rows/UnfilteredRowIterators.java | 11 +
.../apache/cassandra/net/AsyncOneResponse.java | 71 +-
.../cassandra/service/AbstractReadExecutor.java | 412 -------
.../cassandra/service/AsyncRepairCallback.java | 60 -
.../apache/cassandra/service/DataResolver.java | 838 -------------
.../service/DigestMismatchException.java | 35 -
.../cassandra/service/DigestResolver.java | 103 --
.../apache/cassandra/service/ReadCallback.java | 266 -----
.../cassandra/service/ReadRepairDecision.java | 23 -
.../cassandra/service/ResponseResolver.java | 73 --
.../apache/cassandra/service/StorageProxy.java | 148 +--
.../service/reads/AbstractReadExecutor.java | 482 ++++++++
.../service/reads/AsyncRepairCallback.java | 61 +
.../cassandra/service/reads/DataResolver.java | 133 +++
.../cassandra/service/reads/DigestResolver.java | 93 ++
.../cassandra/service/reads/ReadCallback.java | 213 ++++
.../service/reads/ReadRepairDecision.java | 23 +
.../service/reads/ResponseResolver.java | 66 ++
.../reads/ShortReadPartitionsProtection.java | 187 +++
.../service/reads/ShortReadProtection.java | 74 ++
.../service/reads/ShortReadRowsProtection.java | 197 +++
.../reads/repair/BlockingReadRepair.java | 234 ++++
.../service/reads/repair/NoopReadRepair.java | 63 +
.../repair/PartitionIteratorMergeListener.java | 92 ++
.../service/reads/repair/ReadRepair.java | 72 ++
.../service/reads/repair/RepairListener.java | 34 +
.../reads/repair/RowIteratorMergeListener.java | 336 ++++++
.../cassandra/service/DataResolverTest.java | 1122 ------------------
.../cassandra/service/ReadExecutorTest.java | 215 ----
.../service/reads/DataResolverTest.java | 1057 +++++++++++++++++
.../service/reads/ReadExecutorTest.java | 216 ++++
.../reads/repair/TestableReadRepair.java | 109 ++
35 files changed, 3826 insertions(+), 3306 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9e7a599..a6c0323 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058)
* Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
* Add a few options to nodetool verify (CASSANDRA-14201)
* CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index f93e737..19f2e10 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.ReadRepairDecision;
+import org.apache.cassandra.service.reads.ReadRepairDecision;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index dff6dae..edb7833 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -49,6 +49,16 @@ public abstract class UnfilteredPartitionIterators
{
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions);
public void close();
+
+ public static MergeListener NOOP = new MergeListener()
+ {
+ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+ {
+ return UnfilteredRowIterators.MergeListener.NOOP;
+ }
+
+ public void close() {}
+ };
}
@SuppressWarnings("resource") // The created resources are returned right away
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index e1c6685..0244531 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -95,6 +95,17 @@ public abstract class UnfilteredRowIterators
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions);
public void close();
+
+ public static MergeListener NOOP = new MergeListener()
+ {
+ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) {}
+
+ public void onMergedRows(Row merged, Row[] versions) {}
+
+ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) {}
+
+ public void close() {}
+ };
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/net/AsyncOneResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/AsyncOneResponse.java b/src/java/org/apache/cassandra/net/AsyncOneResponse.java
index b7ef227..4e004d2 100644
--- a/src/java/org/apache/cassandra/net/AsyncOneResponse.java
+++ b/src/java/org/apache/cassandra/net/AsyncOneResponse.java
@@ -17,67 +17,54 @@
*/
package org.apache.cassandra.net;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AbstractFuture;
+
/**
* A callback specialized for returning a value from a single target; that is, this is for messages
* that we only send to one recipient.
*/
-public class AsyncOneResponse<T> implements IAsyncCallback<T>
+public class AsyncOneResponse<T> extends AbstractFuture<T> implements IAsyncCallback<T>
{
- private T result;
- private boolean done;
private final long start = System.nanoTime();
- public T get(long timeout, TimeUnit tu) throws TimeoutException
+ public void response(MessageIn<T> response)
{
- timeout = tu.toNanos(timeout);
- boolean interrupted = false;
- try
+ set(response.payload);
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws TimeoutException
+ {
+ long adjustedTimeout = unit.toNanos(timeout) - (System.nanoTime() - start);
+ if (adjustedTimeout <= 0)
{
- synchronized (this)
- {
- while (!done)
- {
- try
- {
- long overallTimeout = timeout - (System.nanoTime() - start);
- if (overallTimeout <= 0)
- {
- throw new TimeoutException("Operation timed out.");
- }
- TimeUnit.NANOSECONDS.timedWait(this, overallTimeout);
- }
- catch (InterruptedException e)
- {
- interrupted = true;
- }
- }
- }
+ throw new TimeoutException("Operation timed out.");
}
- finally
+ try
{
- if (interrupted)
- {
- Thread.currentThread().interrupt();
- }
+ return super.get(timeout, TimeUnit.NANOSECONDS);
}
- return result;
- }
-
- public synchronized void response(MessageIn<T> response)
- {
- if (!done)
+ catch (InterruptedException | ExecutionException e)
{
- result = response.payload;
- done = true;
- this.notifyAll();
+ throw new AssertionError(e);
}
}
- public boolean isLatencyForSnitch()
+ @VisibleForTesting
+ public static <T> AsyncOneResponse<T> immediate(T value)
{
- return false;
+ AsyncOneResponse<T> response = new AsyncOneResponse<>();
+ response.set(value);
+ return response;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
deleted file mode 100644
index e06131e..0000000
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.exceptions.ReadFailureException;
-import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.ReadRepairMetrics;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.SpeculativeRetryParam;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-
-/**
- * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
- *
- * Optionally, may perform additional requests to provide redundancy against replica failure:
- * AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while
- * SpeculatingReadExecutor will wait until it looks like the original request is in danger
- * of timing out before performing extra reads.
- */
-public abstract class AbstractReadExecutor
-{
- private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
-
- protected final ReadCommand command;
- protected final List<InetAddressAndPort> targetReplicas;
- protected final ReadCallback handler;
- protected final TraceState traceState;
- protected final ColumnFamilyStore cfs;
-
- AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime)
- {
- this.command = command;
- this.targetReplicas = targetReplicas;
- this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas, queryStartNanoTime);
- this.cfs = cfs;
- this.traceState = Tracing.instance.get();
-
- // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes
- // knows how to produce older digest but the reverse is not true.
- // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
- // we stop being compatible with pre-3.0 nodes.
- int digestVersion = MessagingService.current_version;
- for (InetAddressAndPort replica : targetReplicas)
- digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica));
- command.setDigestVersion(digestVersion);
- }
-
- protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints)
- {
- makeRequests(command, endpoints);
-
- }
-
- protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints)
- {
- makeRequests(command.copyAsDigestQuery(), endpoints);
- }
-
- private void makeRequests(ReadCommand readCommand, Iterable<InetAddressAndPort> endpoints)
- {
- boolean hasLocalEndpoint = false;
-
- for (InetAddressAndPort endpoint : endpoints)
- {
- if (StorageProxy.canDoLocalRequest(endpoint))
- {
- hasLocalEndpoint = true;
- continue;
- }
-
- if (traceState != null)
- traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
- logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
- MessageOut<ReadCommand> message = readCommand.createMessage();
- MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
- }
-
- // We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
- if (hasLocalEndpoint)
- {
- logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
- }
- }
-
- /**
- * Perform additional requests if it looks like the original will time out. May block while it waits
- * to see if the original requests are answered first.
- */
- public abstract void maybeTryAdditionalReplicas();
-
- /**
- * Get the replicas involved in the [finished] request.
- *
- * @return target replicas + the extra replica, *IF* we speculated.
- */
- public abstract Collection<InetAddressAndPort> getContactedReplicas();
-
- /**
- * send the initial set of requests
- */
- public abstract void executeAsync();
-
- /**
- * wait for an answer. Blocks until success or timeout, so it is caller's
- * responsibility to call maybeTryAdditionalReplicas first.
- */
- public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
- {
- try
- {
- return handler.get();
- }
- catch (ReadTimeoutException e)
- {
- try
- {
- onReadTimeout();
- }
- finally
- {
- throw e;
- }
- }
- }
-
- private static ReadRepairDecision newReadRepairDecision(TableMetadata metadata)
- {
- if (metadata.params.readRepairChance > 0d ||
- metadata.params.dcLocalReadRepairChance > 0)
- {
- double chance = ThreadLocalRandom.current().nextDouble();
- if (metadata.params.readRepairChance > chance)
- return ReadRepairDecision.GLOBAL;
-
- if (metadata.params.dcLocalReadRepairChance > chance)
- return ReadRepairDecision.DC_LOCAL;
- }
-
- return ReadRepairDecision.NONE;
- }
-
- /**
- * @return an executor appropriate for the configured speculative read policy
- */
- public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException
- {
- Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
- List<InetAddressAndPort> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
- // 11980: Excluding EACH_QUORUM reads from potential RR, so that we do not miscount DC responses
- ReadRepairDecision repairDecision = consistencyLevel == ConsistencyLevel.EACH_QUORUM
- ? ReadRepairDecision.NONE
- : newReadRepairDecision(command.metadata());
- List<InetAddressAndPort> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
-
- // Throw UAE early if we don't have enough replicas.
- consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
-
- if (repairDecision != ReadRepairDecision.NONE)
- {
- Tracing.trace("Read-repair {}", repairDecision);
- ReadRepairMetrics.attempted.mark();
- }
-
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
- SpeculativeRetryParam retry = cfs.metadata().params.speculativeRetry;
-
- // Speculative retry is disabled *OR*
- // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
- if (retry.equals(SpeculativeRetryParam.NONE)
- | consistencyLevel == ConsistencyLevel.EACH_QUORUM)
- return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime, false);
-
- // There are simply no extra replicas to speculate.
- // Handle this separately so it can log failed attempts to speculate due to lack of replicas
- if (consistencyLevel.blockFor(keyspace) == allReplicas.size())
- return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime, true);
-
- if (targetReplicas.size() == allReplicas.size())
- {
- // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
- // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
- // (same amount of requests in total, but we turn 1 digest request into a full blown data request).
- return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
- }
-
- // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
- InetAddressAndPort extraReplica = allReplicas.get(targetReplicas.size());
- // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so
- // we might have to find a replacement that's not already in targetReplicas.
- if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica))
- {
- for (InetAddressAndPort address : allReplicas)
- {
- if (!targetReplicas.contains(address))
- {
- extraReplica = address;
- break;
- }
- }
- }
- targetReplicas.add(extraReplica);
-
- if (retry.equals(SpeculativeRetryParam.ALWAYS))
- return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
- else // PERCENTILE or CUSTOM.
- return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
- }
-
- /**
- * Returns true if speculation should occur and if it should then block until it is time to
- * send the speculative reads
- */
- boolean shouldSpeculateAndMaybeWait()
- {
- // no latency information, or we're overloaded
- if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
- return false;
-
- return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS);
- }
-
- void onReadTimeout() {}
-
- public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
- {
- /**
- * If never speculating due to lack of replicas
- * log it is as a failure if it should have happened
- * but couldn't due to lack of replicas
- */
- private final boolean logFailedSpeculation;
-
- public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation)
- {
- super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
- this.logFailedSpeculation = logFailedSpeculation;
- }
-
- public void executeAsync()
- {
- makeDataRequests(targetReplicas.subList(0, 1));
- if (targetReplicas.size() > 1)
- makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
- }
-
- public void maybeTryAdditionalReplicas()
- {
- if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
- {
- cfs.metric.speculativeInsufficientReplicas.inc();
- }
- }
-
- public Collection<InetAddressAndPort> getContactedReplicas()
- {
- return targetReplicas;
- }
- }
-
- static class SpeculatingReadExecutor extends AbstractReadExecutor
- {
- private volatile boolean speculated = false;
-
- public SpeculatingReadExecutor(Keyspace keyspace,
- ColumnFamilyStore cfs,
- ReadCommand command,
- ConsistencyLevel consistencyLevel,
- List<InetAddressAndPort> targetReplicas,
- long queryStartNanoTime)
- {
- super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
- }
-
- public void executeAsync()
- {
- // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating. So we know
- // that the last replica in our list is "extra."
- List<InetAddressAndPort> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
-
- if (handler.blockfor < initialReplicas.size())
- {
- // We're hitting additional targets for read repair. Since our "extra" replica is the least-
- // preferred by the snitch, we do an extra data read to start with against a replica more
- // likely to reply; better to let RR fail than the entire query.
- makeDataRequests(initialReplicas.subList(0, 2));
- if (initialReplicas.size() > 2)
- makeDigestRequests(initialReplicas.subList(2, initialReplicas.size()));
- }
- else
- {
- // not doing read repair; all replies are important, so it doesn't matter which nodes we
- // perform data reads against vs digest.
- makeDataRequests(initialReplicas.subList(0, 1));
- if (initialReplicas.size() > 1)
- makeDigestRequests(initialReplicas.subList(1, initialReplicas.size()));
- }
- }
-
- public void maybeTryAdditionalReplicas()
- {
- if (shouldSpeculateAndMaybeWait())
- {
- //Handle speculation stats first in case the callback fires immediately
- speculated = true;
- cfs.metric.speculativeRetries.inc();
- // Could be waiting on the data, or on enough digests.
- ReadCommand retryCommand = command;
- if (handler.resolver.isDataPresent())
- retryCommand = command.copyAsDigestQuery();
-
- InetAddressAndPort extraReplica = Iterables.getLast(targetReplicas);
- if (traceState != null)
- traceState.trace("speculating read retry on {}", extraReplica);
- logger.trace("speculating read retry on {}", extraReplica);
- MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler);
- }
- }
-
- public Collection<InetAddressAndPort> getContactedReplicas()
- {
- return speculated
- ? targetReplicas
- : targetReplicas.subList(0, targetReplicas.size() - 1);
- }
-
- @Override
- void onReadTimeout()
- {
- //Shouldn't be possible to get here without first attempting to speculate even if the
- //timing is bad
- assert speculated;
- cfs.metric.speculativeFailedRetries.inc();
- }
- }
-
- private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
- {
- public AlwaysSpeculatingReadExecutor(Keyspace keyspace,
- ColumnFamilyStore cfs,
- ReadCommand command,
- ConsistencyLevel consistencyLevel,
- List<InetAddressAndPort> targetReplicas,
- long queryStartNanoTime)
- {
- super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
- }
-
- public void maybeTryAdditionalReplicas()
- {
- // no-op
- }
-
- public Collection<InetAddressAndPort> getContactedReplicas()
- {
- return targetReplicas;
- }
-
- @Override
- public void executeAsync()
- {
- makeDataRequests(targetReplicas.subList(0, targetReplicas.size() > 1 ? 2 : 1));
- if (targetReplicas.size() > 2)
- makeDigestRequests(targetReplicas.subList(2, targetReplicas.size()));
- cfs.metric.speculativeRetries.inc();
- }
-
- @Override
- void onReadTimeout()
- {
- cfs.metric.speculativeFailedRetries.inc();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
deleted file mode 100644
index d613f3d..0000000
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
-{
- private final DataResolver repairResolver;
- private final int blockfor;
- protected final AtomicInteger received = new AtomicInteger(0);
-
- public AsyncRepairCallback(DataResolver repairResolver, int blockfor)
- {
- this.repairResolver = repairResolver;
- this.blockfor = blockfor;
- }
-
- public void response(MessageIn<ReadResponse> message)
- {
- repairResolver.preprocess(message);
- if (received.incrementAndGet() == blockfor)
- {
- StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
- {
- protected void runMayThrow()
- {
- repairResolver.compareResponses();
- }
- });
- }
- }
-
- public boolean isLatencyForSnitch()
- {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
deleted file mode 100644
index 82db754..0000000
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ /dev/null
@@ -1,838 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.util.*;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.filter.DataLimits.Counter;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.transform.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.ExcludingBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class DataResolver extends ResponseResolver
-{
- private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
- Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
-
- @VisibleForTesting
- final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
- private final long queryStartNanoTime;
- private final boolean enforceStrictLiveness;
-
- DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
- {
- super(keyspace, command, consistency, maxResponseCount);
- this.queryStartNanoTime = queryStartNanoTime;
- this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
- }
-
- public PartitionIterator getData()
- {
- ReadResponse response = responses.iterator().next().payload;
- return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
- }
-
- public boolean isDataPresent()
- {
- return !responses.isEmpty();
- }
-
- public void compareResponses()
- {
- // We need to fully consume the results to trigger read repairs if appropriate
- try (PartitionIterator iterator = resolve())
- {
- PartitionIterators.consume(iterator);
- }
- }
-
- public PartitionIterator resolve()
- {
- // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
- // at the beginning of this method), so grab the response count once and use that through the method.
- int count = responses.size();
- List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
- InetAddressAndPort[] sources = new InetAddressAndPort[count];
- for (int i = 0; i < count; i++)
- {
- MessageIn<ReadResponse> msg = responses.get(i);
- iters.add(msg.payload.makeIterator(command));
- sources[i] = msg.from;
- }
-
- /*
- * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
- * have more rows than the client requested. To make sure that we still conform to the original limit,
- * we apply a top-level post-reconciliation counter to the merged partition iterator.
- *
- * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied
- * to the current partition to work. For this reason we have to apply the counter transformation before
- * empty partition discard logic kicks in - for it will eagerly consume the iterator.
- *
- * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
- *
- * See CASSANDRA-13747 for more details.
- */
-
- DataLimits.Counter mergedResultCounter =
- command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
-
- UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter);
- FilteredPartitions filtered =
- FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
- PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
- return Transformation.apply(counted, new EmptyPartitionsDiscarder());
- }
-
- private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
- InetAddressAndPort[] sources,
- DataLimits.Counter mergedResultCounter)
- {
- // If we have only one results, there is no read repair to do and we can't get short reads
- if (results.size() == 1)
- return results.get(0);
-
- /*
- * So-called short reads stems from nodes returning only a subset of the results they have due to the limit,
- * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother.
- */
- if (!command.limits().isUnlimited())
- for (int i = 0; i < results.size(); i++)
- results.set(i, extendWithShortReadProtection(results.get(i), sources[i], mergedResultCounter));
-
- return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources));
- }
-
- private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
- {
- private final InetAddressAndPort[] sources;
-
- private RepairMergeListener(InetAddressAndPort[] sources)
- {
- this.sources = sources;
- }
-
- public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
- {
- return new MergeListener(partitionKey, columns(versions), isReversed(versions));
- }
-
- private RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
- {
- Columns statics = Columns.NONE;
- Columns regulars = Columns.NONE;
- for (UnfilteredRowIterator iter : versions)
- {
- if (iter == null)
- continue;
-
- RegularAndStaticColumns cols = iter.columns();
- statics = statics.mergeTo(cols.statics);
- regulars = regulars.mergeTo(cols.regulars);
- }
- return new RegularAndStaticColumns(statics, regulars);
- }
-
- private boolean isReversed(List<UnfilteredRowIterator> versions)
- {
- for (UnfilteredRowIterator iter : versions)
- {
- if (iter == null)
- continue;
-
- // Everything will be in the same order
- return iter.isReverseOrder();
- }
-
- assert false : "Expected at least one iterator";
- return false;
- }
-
- public void close()
- {
- try
- {
- FBUtilities.waitOnFutures(repairResults, DatabaseDescriptor.getWriteRpcTimeout());
- }
- catch (TimeoutException ex)
- {
- // We got all responses, but timed out while repairing
- int blockFor = consistency.blockFor(keyspace);
- if (Tracing.isTracing())
- Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
- else
- logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
-
- throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
- }
- }
-
- private class MergeListener implements UnfilteredRowIterators.MergeListener
- {
- private final DecoratedKey partitionKey;
- private final RegularAndStaticColumns columns;
- private final boolean isReversed;
- private final PartitionUpdate.Builder[] repairs = new PartitionUpdate.Builder[sources.length];
-
- private final Row.Builder[] currentRows = new Row.Builder[sources.length];
- private final RowDiffListener diffListener;
-
- // The partition level deletion for the merge row.
- private DeletionTime partitionLevelDeletion;
- // When merged has a currently open marker, its time. null otherwise.
- private DeletionTime mergedDeletionTime;
- // For each source, the time of the current deletion as known by the source.
- private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length];
- // For each source, record if there is an open range to send as repair, and from where.
- private final ClusteringBound[] markerToRepair = new ClusteringBound[sources.length];
-
- private MergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed)
- {
- this.partitionKey = partitionKey;
- this.columns = columns;
- this.isReversed = isReversed;
-
- this.diffListener = new RowDiffListener()
- {
- public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
- {
- if (merged != null && !merged.equals(original))
- currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged);
- }
-
- public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
- {
- if (merged != null && !merged.equals(original))
- currentRow(i, clustering).addRowDeletion(merged);
- }
-
- public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
- {
- if (merged != null && !merged.equals(original))
- currentRow(i, clustering).addComplexDeletion(column, merged);
- }
-
- public void onCell(int i, Clustering clustering, Cell merged, Cell original)
- {
- if (merged != null && !merged.equals(original) && isQueried(merged))
- currentRow(i, clustering).addCell(merged);
- }
-
- private boolean isQueried(Cell cell)
- {
- // When we read, we may have some cell that have been fetched but are not selected by the user. Those cells may
- // have empty values as optimization (see CASSANDRA-10655) and hence they should not be included in the read-repair.
- // This is fine since those columns are not actually requested by the user and are only present for the sake of CQL
- // semantic (making sure we can always distinguish between a row that doesn't exist from one that do exist but has
- /// no value for the column requested by the user) and so it won't be unexpected by the user that those columns are
- // not repaired.
- ColumnMetadata column = cell.column();
- ColumnFilter filter = command.columnFilter();
- return column.isComplex() ? filter.fetchedCellIsQueried(column, cell.path()) : filter.fetchedColumnIsQueried(column);
- }
- };
- }
-
- private PartitionUpdate.Builder update(int i)
- {
- if (repairs[i] == null)
- repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
- return repairs[i];
- }
-
- /**
- * The partition level deletion with with which source {@code i} is currently repaired, or
- * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was
- * up to date on it). The output* of this method is only valid after the call to
- * {@link #onMergedPartitionLevelDeletion}.
- */
- private DeletionTime partitionLevelRepairDeletion(int i)
- {
- return repairs[i] == null ? DeletionTime.LIVE : repairs[i].partitionLevelDeletion();
- }
-
- private Row.Builder currentRow(int i, Clustering clustering)
- {
- if (currentRows[i] == null)
- {
- currentRows[i] = BTreeRow.sortedBuilder();
- currentRows[i].newRow(clustering);
- }
- return currentRows[i];
- }
-
- public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
- {
- this.partitionLevelDeletion = mergedDeletion;
- for (int i = 0; i < versions.length; i++)
- {
- if (mergedDeletion.supersedes(versions[i]))
- update(i).addPartitionDeletion(mergedDeletion);
- }
- }
-
- public void onMergedRows(Row merged, Row[] versions)
- {
- // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle
- // those case directly in their respective methods (in other words, it would be inefficient to send a row
- // deletion as repair when we know we've already send a partition level or range tombstone that covers it).
- if (merged.isEmpty())
- return;
-
- Rows.diff(diffListener, merged, versions);
- for (int i = 0; i < currentRows.length; i++)
- {
- if (currentRows[i] != null)
- update(i).add(currentRows[i].build());
- }
- Arrays.fill(currentRows, null);
- }
-
- private DeletionTime currentDeletion()
- {
- return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime;
- }
-
- public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
- {
- try
- {
- // The code for merging range tombstones is a tad complex and we had the assertions there triggered
- // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
- // when that happen without more context that what the assertion errors give us however, hence the
- // catch here that basically gather as much as context as reasonable.
- internalOnMergedRangeTombstoneMarkers(merged, versions);
- }
- catch (AssertionError e)
- {
- // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
- // rather get more info to debug than not.
- TableMetadata table = command.metadata();
- String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, responses:%n %s",
- table,
- merged == null ? "null" : merged.toString(table),
- '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
- Arrays.toString(sources),
- makeResponsesDebugString());
- throw new AssertionError(details, e);
- }
- }
-
- private String makeResponsesDebugString()
- {
- return Joiner.on(",\n")
- .join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
- }
-
- private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
- {
- // The current deletion as of dealing with this marker.
- DeletionTime currentDeletion = currentDeletion();
-
- for (int i = 0; i < versions.length; i++)
- {
- RangeTombstoneMarker marker = versions[i];
-
- // Update what the source now thinks is the current deletion
- if (marker != null)
- sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null;
-
- // If merged == null, some of the source is opening or closing a marker
- if (merged == null)
- {
- // but if it's not this source, move to the next one
- if (marker == null)
- continue;
-
- // We have a close and/or open marker for a source, with nothing corresponding in merged.
- // Because merged is a superset, this imply that we have a current deletion (being it due to an
- // early opening in merged or a partition level deletion) and that this deletion will still be
- // active after that point. Further whatever deletion was open or is open by this marker on the
- // source, that deletion cannot supersedes the current one.
- //
- // But while the marker deletion (before and/or after this point) cannot supersede the current
- // deletion, we want to know if it's equal to it (both before and after), because in that case
- // the source is up to date and we don't want to include repair.
- //
- // So in practice we have 2 possible case:
- // 1) the source was up-to-date on deletion up to that point: then it won't be from that point
- // on unless it's a boundary and the new opened deletion time is also equal to the current
- // deletion (note that this implies the boundary has the same closing and opening deletion
- // time, which should generally not happen, but can due to legacy reading code not avoiding
- // this for a while, see CASSANDRA-13237).
- // 2) the source wasn't up-to-date on deletion up to that point and it may now be (if it isn't
- // we just have nothing to do for that marker).
- assert !currentDeletion.isLive() : currentDeletion.toString();
-
- // Is the source up to date on deletion? It's up to date if it doesn't have an open RT repair
- // nor an "active" partition level deletion (where "active" means that it's greater or equal
- // to the current deletion: if the source has a repaired partition deletion lower than the
- // current deletion, this means the current deletion is due to a previously open range tombstone,
- // and if the source isn't currently repaired for that RT, then it means it's up to date on it).
- DeletionTime partitionRepairDeletion = partitionLevelRepairDeletion(i);
- if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion))
- {
- // Since there is an ongoing merged deletion, the only way we don't have an open repair for
- // this source is that it had a range open with the same deletion as current and it's
- // closing it.
- assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
- : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
-
- // and so unless it's a boundary whose opening deletion time is still equal to the current
- // deletion (see comment above for why this can actually happen), we have to repair the source
- // from that point on.
- if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))))
- markerToRepair[i] = marker.closeBound(isReversed).invert();
- }
- // In case 2) above, we only have something to do if the source is up-to-date after that point
- // (which, since the source isn't up-to-date before that point, means we're opening a new deletion
- // that is equal to the current one).
- else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
- {
- closeOpenMarker(i, marker.openBound(isReversed).invert());
- }
- }
- else
- {
- // We have a change of current deletion in merged (potentially to/from no deletion at all).
-
- if (merged.isClose(isReversed))
- {
- // We're closing the merged range. If we're recorded that this should be repaird for the
- // source, close and add said range to the repair to send.
- if (markerToRepair[i] != null)
- closeOpenMarker(i, merged.closeBound(isReversed));
-
- }
-
- if (merged.isOpen(isReversed))
- {
- // If we're opening a new merged range (or just switching deletion), then unless the source
- // is up to date on that deletion (note that we've updated what the source deleteion is
- // above), we'll have to sent the range to the source.
- DeletionTime newDeletion = merged.openDeletionTime(isReversed);
- DeletionTime sourceDeletion = sourceDeletionTime[i];
- if (!newDeletion.equals(sourceDeletion))
- markerToRepair[i] = merged.openBound(isReversed);
- }
- }
- }
-
- if (merged != null)
- mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null;
- }
-
- private void closeOpenMarker(int i, ClusteringBound close)
- {
- ClusteringBound open = markerToRepair[i];
- update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
- markerToRepair[i] = null;
- }
-
- public void close()
- {
- for (int i = 0; i < repairs.length; i++)
- if (null != repairs[i])
- sendRepairMutation(repairs[i].build(), sources[i]);
- }
-
- private void sendRepairMutation(PartitionUpdate partition, InetAddressAndPort destination)
- {
- Mutation mutation = new Mutation(partition);
- int messagingVersion = MessagingService.instance().getVersion(destination);
-
- int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
- int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
-
- if (mutationSize <= maxMutationSize)
- {
- Tracing.trace("Sending read-repair-mutation to {}", destination);
- // use a separate verb here to avoid writing hints on timeouts
- MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
- repairResults.add(MessagingService.instance().sendRR(message, destination));
- ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark();
- }
- else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
- {
- logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
- mutationSize,
- maxMutationSize,
- command.metadata(),
- command.metadata().partitionKeyType.getString(partitionKey.getKey()),
- destination);
- }
- else
- {
- logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
- mutationSize,
- maxMutationSize,
- command.metadata(),
- command.metadata().partitionKeyType.getString(partitionKey.getKey()),
- destination);
-
- int blockFor = consistency.blockFor(keyspace);
- Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
- throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
- }
- }
- }
- }
-
- private UnfilteredPartitionIterator extendWithShortReadProtection(UnfilteredPartitionIterator partitions,
- InetAddressAndPort source,
- DataLimits.Counter mergedResultCounter)
- {
- DataLimits.Counter singleResultCounter =
- command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
-
- ShortReadPartitionsProtection protection =
- new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter, queryStartNanoTime);
-
- /*
- * The order of extention and transformations is important here. Extending with more partitions has to happen
- * first due to the way BaseIterator.hasMoreContents() works: only transformations applied after extension will
- * be called on the first partition of the extended iterator.
- *
- * Additionally, we want singleResultCounter to be applied after SRPP, so that its applyToPartition() method will
- * be called last, after the extension done by SRRP.applyToPartition() call. That way we preserve the same order
- * when it comes to calling SRRP.moreContents() and applyToRow() callbacks.
- *
- * See ShortReadPartitionsProtection.applyToPartition() for more details.
- */
-
- // extend with moreContents() only if it's a range read command with no partition key specified
- if (!command.isLimitedToOnePartition())
- partitions = MorePartitions.extend(partitions, protection); // register SRPP.moreContents()
-
- partitions = Transformation.apply(partitions, protection); // register SRPP.applyToPartition()
- partitions = Transformation.apply(partitions, singleResultCounter); // register the per-source counter
-
- return partitions;
- }
-
- /*
- * We have a potential short read if the result from a given node contains the requested number of rows
- * (i.e. it has stopped returning results due to the limit), but some of them haven't
- * made it into the final post-reconciliation result due to other nodes' row, range, and/or partition tombstones.
- *
- * If that is the case, then that node may have more rows that we should fetch, as otherwise we could
- * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which
- * which we also need to fetch as they may shadow rows or partitions from other replicas' results, which we would
- * otherwise return incorrectly.
- */
- private class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator>
- {
- private final InetAddressAndPort source;
-
- private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
- private final DataLimits.Counter mergedResultCounter; // merged end-result counter
-
- private DecoratedKey lastPartitionKey; // key of the last observed partition
-
- private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call
-
- private final long queryStartNanoTime;
-
- private ShortReadPartitionsProtection(InetAddressAndPort source,
- DataLimits.Counter singleResultCounter,
- DataLimits.Counter mergedResultCounter,
- long queryStartNanoTime)
- {
- this.source = source;
- this.singleResultCounter = singleResultCounter;
- this.mergedResultCounter = mergedResultCounter;
- this.queryStartNanoTime = queryStartNanoTime;
- }
-
- @Override
- public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
- {
- partitionsFetched = true;
-
- lastPartitionKey = partition.partitionKey();
-
- /*
- * Extend for moreContents() then apply protection to track lastClustering by applyToRow().
- *
- * If we don't apply the transformation *after* extending the partition with MoreRows,
- * applyToRow() method of protection will not be called on the first row of the new extension iterator.
- */
- ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.metadata(), partition.partitionKey());
- return Transformation.apply(MoreRows.extend(partition, protection), protection);
- }
-
- /*
- * We only get here once all the rows and partitions in this iterator have been iterated over, and so
- * if the node had returned the requested number of rows but we still get here, then some results were
- * skipped during reconciliation.
- */
- public UnfilteredPartitionIterator moreContents()
- {
- // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit
- assert !mergedResultCounter.isDone();
-
- // we do not apply short read protection when we have no limits at all
- assert !command.limits().isUnlimited();
-
- /*
- * If this is a single partition read command or an (indexed) partition range read command with
- * a partition key specified, then we can't and shouldn't try fetch more partitions.
- */
- assert !command.isLimitedToOnePartition();
-
- /*
- * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more.
- *
- * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
- * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
- */
- if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
- return null;
-
- /*
- * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator.
- * There is no point to ask the replica for more rows - it has no more in the requested range.
- */
- if (!partitionsFetched)
- return null;
- partitionsFetched = false;
-
- /*
- * We are going to fetch one partition at a time for thrift and potentially more for CQL.
- * The row limit will either be set to the per partition limit - if the command has no total row limit set, or
- * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions,
- * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones.
- */
- int toQuery = command.limits().count() != DataLimits.NO_LIMIT
- ? command.limits().count() - counted(mergedResultCounter)
- : command.limits().perPartitionCount();
-
- ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
- Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
-
- PartitionRangeReadCommand cmd = makeFetchAdditionalPartitionReadCommand(toQuery);
- return executeReadCommand(cmd);
- }
-
- // Counts the number of rows for regular queries and the number of groups for GROUP BY queries
- private int counted(Counter counter)
- {
- return command.limits().isGroupByLimit()
- ? counter.rowCounted()
- : counter.counted();
- }
-
- private PartitionRangeReadCommand makeFetchAdditionalPartitionReadCommand(int toQuery)
- {
- PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
-
- DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
-
- AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
- AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
- ? new Range<>(lastPartitionKey, bounds.right)
- : new ExcludingBounds<>(lastPartitionKey, bounds.right);
- DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
-
- return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
- }
-
- private class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
- {
- private final TableMetadata metadata;
- private final DecoratedKey partitionKey;
-
- private Clustering lastClustering; // clustering of the last observed row
-
- private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows
- private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command)
- private int lastQueried = 0; // # extra rows requested from the replica last time
-
- private ShortReadRowsProtection(TableMetadata metadata, DecoratedKey partitionKey)
- {
- this.metadata = metadata;
- this.partitionKey = partitionKey;
- }
-
- @Override
- public Row applyToRow(Row row)
- {
- lastClustering = row.clustering();
- return row;
- }
-
- /*
- * We only get here once all the rows in this iterator have been iterated over, and so if the node
- * had returned the requested number of rows but we still get here, then some results were skipped
- * during reconciliation.
- */
- public UnfilteredRowIterator moreContents()
- {
- // never try to request additional rows from replicas if our reconciled partition is already filled to the limit
- assert !mergedResultCounter.isDoneForPartition();
-
- // we do not apply short read protection when we have no limits at all
- assert !command.limits().isUnlimited();
-
- /*
- * If the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more.
- *
- * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
- * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
- */
- if (!singleResultCounter.isDoneForPartition() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
- return null;
-
- /*
- * If the replica has no live rows in the partition, don't try to fetch more.
- *
- * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't
- * always cover this scenario:
- * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit],
- * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition.
- *
- * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch
- * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only
- * have tombstones in the current partition.
- *
- * One other way we can hit this condition is when the partition only has a live static row and no regular
- * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after
- * the moreContents() call.
- */
- if (countedInCurrentPartition(singleResultCounter) == 0)
- return null;
-
- /*
- * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering.
- * We already have the row, so there is no point in asking for more from the partition.
- */
- if (Clustering.EMPTY == lastClustering)
- return null;
-
- lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted;
- lastCounted = countedInCurrentPartition(singleResultCounter);
-
- // getting back fewer rows than we asked for means the partition on the replica has been fully consumed
- if (lastQueried > 0 && lastFetched < lastQueried)
- return null;
-
- /*
- * At this point we know that:
- * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more
- * rows in the partition
- * 2. at least one of those returned rows was shadowed by a tombstone returned from another
- * replica
- * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to
- * avoid a short read
- *
- * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b
- * are defined as follows:
- * [a] limits.count() - mergedResultCounter.counted()
- * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition()
- *
- * It would be naive to query for exactly that many rows, as it's possible and not unlikely
- * that some of the returned rows would also be shadowed by tombstones from other hosts.
- *
- * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result;
- * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it.
- *
- * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number
- * of rows fetched: there is a high transactional cost for every individual request, but a relatively low
- * marginal cost for each extra row requested.
- *
- * As such it's better to overfetch than to underfetch extra rows from a host; but at the same
- * time we want to respect paging limits and not blow up spectacularly.
- *
- * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only
- * counts.
- *
- * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits.
- *
- * See CASSANDRA-13794 for more details.
- */
- lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount());
-
- ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
- Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source);
-
- SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried);
- return UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd);
- }
-
- // Counts the number of rows for regular queries and the number of groups for GROUP BY queries
- private int countedInCurrentPartition(Counter counter)
- {
- return command.limits().isGroupByLimit()
- ? counter.rowCountedInCurrentPartition()
- : counter.countedInCurrentPartition();
- }
-
- private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
- {
- ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
- if (null != lastClustering)
- filter = filter.forPaging(metadata.comparator, lastClustering, false);
-
- return SinglePartitionReadCommand.create(command.metadata(),
- command.nowInSec(),
- command.columnFilter(),
- command.rowFilter(),
- command.limits().forShortReadRetry(toQuery),
- partitionKey,
- filter,
- command.indexMetadata());
- }
- }
-
- private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd)
- {
- DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1, queryStartNanoTime);
- ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source), queryStartNanoTime);
-
- if (StorageProxy.canDoLocalRequest(source))
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
- else
- MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source, handler);
-
- // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
- handler.awaitResults();
- assert resolver.responses.size() == 1;
- return resolver.responses.get(0).payload.makeIterator(command);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/DigestMismatchException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestMismatchException.java b/src/java/org/apache/cassandra/service/DigestMismatchException.java
deleted file mode 100644
index 18d5939..0000000
--- a/src/java/org/apache/cassandra/service/DigestMismatchException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-@SuppressWarnings("serial")
-public class DigestMismatchException extends Exception
-{
- public DigestMismatchException(DecoratedKey key, ByteBuffer digest1, ByteBuffer digest2)
- {
- super(String.format("Mismatch for key %s (%s vs %s)",
- key.toString(),
- ByteBufferUtil.bytesToHex(digest1),
- ByteBufferUtil.bytesToHex(digest2)));
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
deleted file mode 100644
index 6a528e9..0000000
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
-import org.apache.cassandra.net.MessageIn;
-
-public class DigestResolver extends ResponseResolver
-{
- private volatile ReadResponse dataResponse;
-
- public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
- {
- super(keyspace, command, consistency, maxResponseCount);
- }
-
- @Override
- public void preprocess(MessageIn<ReadResponse> message)
- {
- super.preprocess(message);
- if (dataResponse == null && !message.payload.isDigestResponse())
- dataResponse = message.payload;
- }
-
- /**
- * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground
- */
- public PartitionIterator getData()
- {
- assert isDataPresent();
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
- }
-
- /*
- * This method handles two different scenarios:
- *
- * a) we're handling the initial read of data from the closest replica + digests
- * from the rest. In this case we check the digests against each other,
- * throw an exception if there is a mismatch, otherwise return the data row.
- *
- * b) we're checking additional digests that arrived after the minimum to handle
- * the requested ConsistencyLevel, i.e. asynchronous read repair check
- */
- public PartitionIterator resolve() throws DigestMismatchException
- {
- if (responses.size() == 1)
- return getData();
-
- if (logger.isTraceEnabled())
- logger.trace("resolving {} responses", responses.size());
-
- compareResponses();
-
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
- }
-
- public void compareResponses() throws DigestMismatchException
- {
- long start = System.nanoTime();
-
- // validate digests against each other; throw immediately on mismatch.
- ByteBuffer digest = null;
- for (MessageIn<ReadResponse> message : responses)
- {
- ReadResponse response = message.payload;
-
- ByteBuffer newDigest = response.digest(command);
- if (digest == null)
- digest = newDigest;
- else if (!digest.equals(newDigest))
- // rely on the fact that only single partition queries use digests
- throw new DigestMismatchException(((SinglePartitionReadCommand)command).partitionKey(), digest, newDigest);
- }
-
- if (logger.isTraceEnabled())
- logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
- }
-
- public boolean isDataPresent()
- {
- return dataResponse != null;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/5] cassandra git commit: Refactor read executor and response
resolver, abstract read repair
Posted by bd...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
new file mode 100644
index 0000000..6b1da0b
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.util.function.Function;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tracing.Tracing;
+
+class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
+{
+ private final ReadCommand command;
+ private final InetAddressAndPort source;
+ private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
+ private final DataLimits.Counter mergedResultCounter; // merged end-result counter
+ private final Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor;
+ private final TableMetadata metadata;
+ private final DecoratedKey partitionKey;
+
+ private Clustering lastClustering; // clustering of the last observed row
+
+ private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows
+ private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command)
+ private int lastQueried = 0; // # extra rows requested from the replica last time
+
+ ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, InetAddressAndPort source,
+ Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor,
+ DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
+ {
+ this.command = command;
+ this.source = source;
+ this.commandExecutor = commandExecutor;
+ this.singleResultCounter = singleResultCounter;
+ this.mergedResultCounter = mergedResultCounter;
+ this.metadata = command.metadata();
+ this.partitionKey = partitionKey;
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ lastClustering = row.clustering();
+ return row;
+ }
+
+ /*
+ * We only get here once all the rows in this iterator have been iterated over, and so if the node
+ * had returned the requested number of rows but we still get here, then some results were skipped
+ * during reconciliation.
+ */
+ public UnfilteredRowIterator moreContents()
+ {
+ // never try to request additional rows from replicas if our reconciled partition is already filled to the limit
+ assert !mergedResultCounter.isDoneForPartition();
+
+ // we do not apply short read protection when we have no limits at all
+ assert !command.limits().isUnlimited();
+
+ /*
+ * If the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more.
+ *
+ * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
+ * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
+ */
+ if (!singleResultCounter.isDoneForPartition() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
+ return null;
+
+ /*
+ * If the replica has no live rows in the partition, don't try to fetch more.
+ *
+ * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't
+ * always cover this scenario:
+ * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit],
+ * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition.
+ *
+ * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch
+ * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only
+ * have tombstones in the current partition.
+ *
+ * One other way we can hit this condition is when the partition only has a live static row and no regular
+ * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after
+ * the moreContents() call.
+ */
+ if (countedInCurrentPartition(singleResultCounter) == 0)
+ return null;
+
+ /*
+ * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering.
+ * We already have the row, so there is no point in asking for more from the partition.
+ */
+ if (Clustering.EMPTY == lastClustering)
+ return null;
+
+ lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted;
+ lastCounted = countedInCurrentPartition(singleResultCounter);
+
+ // getting back fewer rows than we asked for means the partition on the replica has been fully consumed
+ if (lastQueried > 0 && lastFetched < lastQueried)
+ return null;
+
+ /*
+ * At this point we know that:
+ * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more
+ * rows in the partition
+ * 2. at least one of those returned rows was shadowed by a tombstone returned from another
+ * replica
+ * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to
+ * avoid a short read
+ *
+ * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b
+ * are defined as follows:
+ * [a] limits.count() - mergedResultCounter.counted()
+ * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition()
+ *
+ * It would be naive to query for exactly that many rows, as it's possible and not unlikely
+ * that some of the returned rows would also be shadowed by tombstones from other hosts.
+ *
+ * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result;
+ * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it.
+ *
+ * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number
+ * of rows fetched: there is a high transactional cost for every individual request, but a relatively low
+ * marginal cost for each extra row requested.
+ *
+ * As such it's better to overfetch than to underfetch extra rows from a host; but at the same
+ * time we want to respect paging limits and not blow up spectacularly.
+ *
+ * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only
+ * counts.
+ *
+ * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits.
+ *
+ * See CASSANDRA-13794 for more details.
+ */
+ lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount());
+
+ ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
+ Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source);
+
+ SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried);
+ return UnfilteredPartitionIterators.getOnlyElement(commandExecutor.apply(cmd), cmd);
+ }
+
+ // Counts the number of rows for regular queries and the number of groups for GROUP BY queries
+ private int countedInCurrentPartition(DataLimits.Counter counter)
+ {
+ return command.limits().isGroupByLimit()
+ ? counter.rowCountedInCurrentPartition()
+ : counter.countedInCurrentPartition();
+ }
+
+ private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
+ {
+ ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
+ if (null != lastClustering)
+ filter = filter.forPaging(metadata.comparator, lastClustering, false);
+
+ return SinglePartitionReadCommand.create(command.metadata(),
+ command.nowInSec(),
+ command.columnFilter(),
+ command.rowFilter(),
+ command.limits().forShortReadRetry(toQuery),
+ partitionKey,
+ filter,
+ command.indexMetadata());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
new file mode 100644
index 0000000..07b6e2c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.net.AsyncOneResponse;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.reads.AsyncRepairCallback;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.DigestResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.ResponseResolver;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.Accumulator;
+
+/**
+ * 'Classic' read repair. Doesn't allow the client read to return until
+ * updates have been written to nodes needing correction.
+ */
+public class BlockingReadRepair implements ReadRepair, RepairListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
+
+ private final ReadCommand command;
+ private final List<InetAddressAndPort> endpoints;
+ private final long queryStartNanoTime;
+ private final ConsistencyLevel consistency;
+
+ private final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
+
+ private volatile DigestRepair digestRepair = null;
+
+ private static class DigestRepair
+ {
+ private final DataResolver dataResolver;
+ private final ReadCallback readCallback;
+ private final Consumer<PartitionIterator> resultConsumer;
+
+ public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer)
+ {
+ this.dataResolver = dataResolver;
+ this.readCallback = readCallback;
+ this.resultConsumer = resultConsumer;
+ }
+ }
+
+ public BlockingReadRepair(ReadCommand command,
+ List<InetAddressAndPort> endpoints,
+ long queryStartNanoTime,
+ ConsistencyLevel consistency)
+ {
+ this.command = command;
+ this.endpoints = endpoints;
+ this.queryStartNanoTime = queryStartNanoTime;
+ this.consistency = consistency;
+ }
+
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+ {
+ return new PartitionIteratorMergeListener(endpoints, command, this);
+ }
+
+ public static class BlockingPartitionRepair extends AbstractFuture<Object> implements RepairListener.PartitionRepair
+ {
+
+ final List<AsyncOneResponse<?>> responses;
+
+ public BlockingPartitionRepair(int expectedResponses)
+ {
+ this.responses = new ArrayList<>(expectedResponses);
+ }
+
+ protected AsyncOneResponse sendMutation(InetAddressAndPort endpoint, Mutation mutation)
+ {
+ // use a separate verb here because we don't want these to be get the white glove hint-
+ // on-timeout behavior that a "real" mutation gets
+ Tracing.trace("Sending read-repair-mutation to {}", endpoint);
+ MessageOut<Mutation> msg = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+ return MessagingService.instance().sendRR(msg, endpoint);
+ }
+
+ public void reportMutation(InetAddressAndPort endpoint, Mutation mutation)
+ {
+ responses.add(sendMutation(endpoint, mutation));
+ }
+
+ public void finish()
+ {
+ Futures.addCallback(Futures.allAsList(responses), new FutureCallback<List<Object>>()
+ {
+ public void onSuccess(@Nullable List<Object> result)
+ {
+ set(result);
+ }
+
+ public void onFailure(Throwable t)
+ {
+ setException(t);
+ }
+ });
+ }
+ }
+
+ public void awaitRepairs(long timeout)
+ {
+ try
+ {
+ Futures.allAsList(repairs).get(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException ex)
+ {
+ // We got all responses, but timed out while repairing
+ Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+ int blockFor = consistency.blockFor(keyspace);
+ if (Tracing.isTracing())
+ Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+ else
+ logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
+
+ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+ }
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public PartitionRepair startPartitionRepair()
+ {
+ BlockingPartitionRepair repair = new BlockingPartitionRepair(endpoints.size());
+ repairs.add(repair);
+ return repair;
+ }
+
+ public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+ {
+ ReadRepairMetrics.repairedBlocking.mark();
+
+ // Do a full data read to resolve the correct response (and repair node that need be)
+ Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+ DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, allEndpoints.size(), queryStartNanoTime, this);
+ ReadCallback readCallback = new ReadCallback(resolver, ConsistencyLevel.ALL, contactedEndpoints.size(), command,
+ keyspace, allEndpoints, queryStartNanoTime, this);
+
+ digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
+
+ for (InetAddressAndPort endpoint : contactedEndpoints)
+ {
+ Tracing.trace("Enqueuing full data read to {}", endpoint);
+ MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, readCallback);
+ }
+ }
+
+ public void awaitForegroundRepairFinish() throws ReadTimeoutException
+ {
+ if (digestRepair != null)
+ {
+ digestRepair.readCallback.awaitResults();
+ digestRepair.resultConsumer.accept(digestRepair.dataResolver.resolve());
+ }
+ }
+
+ public void maybeStartBackgroundRepair(ResponseResolver resolver)
+ {
+ TraceState traceState = Tracing.instance.get();
+ if (traceState != null)
+ traceState.trace("Initiating read-repair");
+ StageManager.getStage(Stage.READ_REPAIR).execute(() -> resolver.evaluateAllResponses(traceState));
+ }
+
+ public void backgroundDigestRepair(TraceState traceState)
+ {
+ if (traceState != null)
+ traceState.trace("Digest mismatch");
+ if (logger.isDebugEnabled())
+ logger.debug("Digest mismatch");
+
+ ReadRepairMetrics.repairedBackground.mark();
+
+ Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+ final DataResolver repairResolver = new DataResolver(keyspace, command, consistency, endpoints.size(), queryStartNanoTime, this);
+ AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
+
+ for (InetAddressAndPort endpoint : endpoints)
+ MessagingService.instance().sendRR(command.createMessage(), endpoint, repairHandler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
new file mode 100644
index 0000000..ff65dbb
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.DigestResolver;
+import org.apache.cassandra.service.reads.ResponseResolver;
+import org.apache.cassandra.tracing.TraceState;
+
+public class NoopReadRepair implements ReadRepair
+{
+ public static final NoopReadRepair instance = new NoopReadRepair();
+
+ private NoopReadRepair() {}
+
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+ {
+ return UnfilteredPartitionIterators.MergeListener.NOOP;
+ }
+
+ public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+ {
+ resultConsumer.accept(digestResolver.getData());
+ }
+
+ public void awaitForegroundRepairFinish() throws ReadTimeoutException
+ {
+
+ }
+
+ public void maybeStartBackgroundRepair(ResponseResolver resolver)
+ {
+
+ }
+
+ public void backgroundDigestRepair(TraceState traceState)
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
new file mode 100644
index 0000000..3ad57cf
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(PartitionIteratorMergeListener.class);
+
+ private final InetAddressAndPort[] sources;
+ private final ReadCommand command;
+ private final RepairListener repairListener;
+
+ public PartitionIteratorMergeListener(InetAddressAndPort[] sources, ReadCommand command, RepairListener repairListener)
+ {
+ this.sources = sources;
+ this.command = command;
+ this.repairListener = repairListener;
+ }
+
+ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+ {
+ return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), sources, command, repairListener);
+ }
+
+ private RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
+ {
+ Columns statics = Columns.NONE;
+ Columns regulars = Columns.NONE;
+ for (UnfilteredRowIterator iter : versions)
+ {
+ if (iter == null)
+ continue;
+
+ RegularAndStaticColumns cols = iter.columns();
+ statics = statics.mergeTo(cols.statics);
+ regulars = regulars.mergeTo(cols.regulars);
+ }
+ return new RegularAndStaticColumns(statics, regulars);
+ }
+
+ private boolean isReversed(List<UnfilteredRowIterator> versions)
+ {
+ for (UnfilteredRowIterator iter : versions)
+ {
+ if (iter == null)
+ continue;
+
+ // Everything will be in the same order
+ return iter.isReverseOrder();
+ }
+
+ assert false : "Expected at least one iterator";
+ return false;
+ }
+
+ public void close()
+ {
+ repairListener.awaitRepairs(DatabaseDescriptor.getWriteRpcTimeout());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
new file mode 100644
index 0000000..bdd730c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.DigestResolver;
+import org.apache.cassandra.service.reads.ResponseResolver;
+import org.apache.cassandra.tracing.TraceState;
+
+public interface ReadRepair
+{
+ /**
+ * Used by DataResolver to generate corrections as the partition iterator is consumed
+ */
+ UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints);
+
+ /**
+ * Called when the digests from the initial read don't match. Reads may block on the
+ * repair started by this method.
+ */
+ public void startForegroundRepair(DigestResolver digestResolver,
+ List<InetAddressAndPort> allEndpoints,
+ List<InetAddressAndPort> contactedEndpoints,
+ Consumer<PartitionIterator> resultConsumer);
+
+ /**
+ * Wait for any operations started by {@link ReadRepair#startForegroundRepair} to complete
+ * @throws ReadTimeoutException
+ */
+ public void awaitForegroundRepairFinish() throws ReadTimeoutException;
+
+ /**
+ * Called when responses from all replicas have been received. Read will not block on this.
+ * @param resolver
+ */
+ public void maybeStartBackgroundRepair(ResponseResolver resolver);
+
+ /**
+ * If {@link ReadRepair#maybeStartBackgroundRepair} was called with a {@link DigestResolver}, this will
+ * be called to perform a repair if there was a digest mismatch
+ */
+ public void backgroundDigestRepair(TraceState traceState);
+
+ static ReadRepair create(ReadCommand command, List<InetAddressAndPort> endpoints, long queryStartNanoTime, ConsistencyLevel consistency)
+ {
+ return new BlockingReadRepair(command, endpoints, queryStartNanoTime, consistency);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java b/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
new file mode 100644
index 0000000..174c0e7
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public interface RepairListener
+{
+ interface PartitionRepair
+ {
+ void reportMutation(InetAddressAndPort endpoint, Mutation mutation);
+ void finish();
+ }
+
+ PartitionRepair startPartitionRepair();
+ void awaitRepairs(long timeoutMillis);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
new file mode 100644
index 0000000..63bd3ce
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.Arrays;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowDiffListener;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+
+public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener
+{
+ private final DecoratedKey partitionKey;
+ private final RegularAndStaticColumns columns;
+ private final boolean isReversed;
+ private final InetAddressAndPort[] sources;
+ private final ReadCommand command;
+
+ private final PartitionUpdate.Builder[] repairs;
+
+ private final Row.Builder[] currentRows;
+ private final RowDiffListener diffListener;
+
+ // The partition level deletion for the merge row.
+ private DeletionTime partitionLevelDeletion;
+ // When merged has a currently open marker, its time. null otherwise.
+ private DeletionTime mergedDeletionTime;
+ // For each source, the time of the current deletion as known by the source.
+ private final DeletionTime[] sourceDeletionTime;
+ // For each source, record if there is an open range to send as repair, and from where.
+ private final ClusteringBound[] markerToRepair;
+
+ private final RepairListener repairListener;
+
+ public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] sources, ReadCommand command, RepairListener repairListener)
+ {
+ this.partitionKey = partitionKey;
+ this.columns = columns;
+ this.isReversed = isReversed;
+ this.sources = sources;
+ repairs = new PartitionUpdate.Builder[sources.length];
+ currentRows = new Row.Builder[sources.length];
+ sourceDeletionTime = new DeletionTime[sources.length];
+ markerToRepair = new ClusteringBound[sources.length];
+ this.command = command;
+ this.repairListener = repairListener;
+
+ this.diffListener = new RowDiffListener()
+ {
+ public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+ {
+ if (merged != null && !merged.equals(original))
+ currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged);
+ }
+
+ public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
+ {
+ if (merged != null && !merged.equals(original))
+ currentRow(i, clustering).addRowDeletion(merged);
+ }
+
+ public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
+ {
+ if (merged != null && !merged.equals(original))
+ currentRow(i, clustering).addComplexDeletion(column, merged);
+ }
+
+ public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+ {
+ if (merged != null && !merged.equals(original) && isQueried(merged))
+ currentRow(i, clustering).addCell(merged);
+ }
+
+ private boolean isQueried(Cell cell)
+ {
+ // When we read, we may have some cell that have been fetched but are not selected by the user. Those cells may
+ // have empty values as optimization (see CASSANDRA-10655) and hence they should not be included in the read-repair.
+ // This is fine since those columns are not actually requested by the user and are only present for the sake of CQL
+ // semantic (making sure we can always distinguish between a row that doesn't exist from one that do exist but has
+ /// no value for the column requested by the user) and so it won't be unexpected by the user that those columns are
+ // not repaired.
+ ColumnMetadata column = cell.column();
+ ColumnFilter filter = RowIteratorMergeListener.this.command.columnFilter();
+ return column.isComplex() ? filter.fetchedCellIsQueried(column, cell.path()) : filter.fetchedColumnIsQueried(column);
+ }
+ };
+ }
+
+ private PartitionUpdate.Builder update(int i)
+ {
+ if (repairs[i] == null)
+ repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
+ return repairs[i];
+ }
+
+ /**
+ * The partition level deletion with with which source {@code i} is currently repaired, or
+ * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was
+ * up to date on it). The output* of this method is only valid after the call to
+ * {@link #onMergedPartitionLevelDeletion}.
+ */
+ private DeletionTime partitionLevelRepairDeletion(int i)
+ {
+ return repairs[i] == null ? DeletionTime.LIVE : repairs[i].partitionLevelDeletion();
+ }
+
+ private Row.Builder currentRow(int i, Clustering clustering)
+ {
+ if (currentRows[i] == null)
+ {
+ currentRows[i] = BTreeRow.sortedBuilder();
+ currentRows[i].newRow(clustering);
+ }
+ return currentRows[i];
+ }
+
+ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+ {
+ this.partitionLevelDeletion = mergedDeletion;
+ for (int i = 0; i < versions.length; i++)
+ {
+ if (mergedDeletion.supersedes(versions[i]))
+ update(i).addPartitionDeletion(mergedDeletion);
+ }
+ }
+
+ public void onMergedRows(Row merged, Row[] versions)
+ {
+ // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle
+ // those case directly in their respective methods (in other words, it would be inefficient to send a row
+ // deletion as repair when we know we've already send a partition level or range tombstone that covers it).
+ if (merged.isEmpty())
+ return;
+
+ Rows.diff(diffListener, merged, versions);
+ for (int i = 0; i < currentRows.length; i++)
+ {
+ if (currentRows[i] != null)
+ update(i).add(currentRows[i].build());
+ }
+ Arrays.fill(currentRows, null);
+ }
+
+ private DeletionTime currentDeletion()
+ {
+ return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime;
+ }
+
+ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+ {
+ try
+ {
+ // The code for merging range tombstones is a tad complex and we had the assertions there triggered
+ // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
+ // when that happen without more context that what the assertion errors give us however, hence the
+ // catch here that basically gather as much as context as reasonable.
+ internalOnMergedRangeTombstoneMarkers(merged, versions);
+ }
+ catch (AssertionError e)
+ {
+ // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
+ // rather get more info to debug than not.
+ TableMetadata table = command.metadata();
+ String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}",
+ table,
+ merged == null ? "null" : merged.toString(table),
+ '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+ Arrays.toString(sources));
+ throw new AssertionError(details, e);
+ }
+ }
+
+ private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+ {
+ // The current deletion as of dealing with this marker.
+ DeletionTime currentDeletion = currentDeletion();
+
+ for (int i = 0; i < versions.length; i++)
+ {
+ RangeTombstoneMarker marker = versions[i];
+
+ // Update what the source now thinks is the current deletion
+ if (marker != null)
+ sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null;
+
+ // If merged == null, some of the source is opening or closing a marker
+ if (merged == null)
+ {
+ // but if it's not this source, move to the next one
+ if (marker == null)
+ continue;
+
+ // We have a close and/or open marker for a source, with nothing corresponding in merged.
+ // Because merged is a superset, this imply that we have a current deletion (being it due to an
+ // early opening in merged or a partition level deletion) and that this deletion will still be
+ // active after that point. Further whatever deletion was open or is open by this marker on the
+ // source, that deletion cannot supersedes the current one.
+ //
+ // But while the marker deletion (before and/or after this point) cannot supersede the current
+ // deletion, we want to know if it's equal to it (both before and after), because in that case
+ // the source is up to date and we don't want to include repair.
+ //
+ // So in practice we have 2 possible case:
+ // 1) the source was up-to-date on deletion up to that point: then it won't be from that point
+ // on unless it's a boundary and the new opened deletion time is also equal to the current
+ // deletion (note that this implies the boundary has the same closing and opening deletion
+ // time, which should generally not happen, but can due to legacy reading code not avoiding
+ // this for a while, see CASSANDRA-13237).
+ // 2) the source wasn't up-to-date on deletion up to that point and it may now be (if it isn't
+ // we just have nothing to do for that marker).
+ assert !currentDeletion.isLive() : currentDeletion.toString();
+
+ // Is the source up to date on deletion? It's up to date if it doesn't have an open RT repair
+ // nor an "active" partition level deletion (where "active" means that it's greater or equal
+ // to the current deletion: if the source has a repaired partition deletion lower than the
+ // current deletion, this means the current deletion is due to a previously open range tombstone,
+ // and if the source isn't currently repaired for that RT, then it means it's up to date on it).
+ DeletionTime partitionRepairDeletion = partitionLevelRepairDeletion(i);
+ if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion))
+ {
+ // Since there is an ongoing merged deletion, the only way we don't have an open repair for
+ // this source is that it had a range open with the same deletion as current and it's
+ // closing it.
+ assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
+ : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
+
+ // and so unless it's a boundary whose opening deletion time is still equal to the current
+ // deletion (see comment above for why this can actually happen), we have to repair the source
+ // from that point on.
+ if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))))
+ markerToRepair[i] = marker.closeBound(isReversed).invert();
+ }
+ // In case 2) above, we only have something to do if the source is up-to-date after that point
+ // (which, since the source isn't up-to-date before that point, means we're opening a new deletion
+ // that is equal to the current one).
+ else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
+ {
+ closeOpenMarker(i, marker.openBound(isReversed).invert());
+ }
+ }
+ else
+ {
+ // We have a change of current deletion in merged (potentially to/from no deletion at all).
+
+ if (merged.isClose(isReversed))
+ {
+ // We're closing the merged range. If we're recorded that this should be repaird for the
+ // source, close and add said range to the repair to send.
+ if (markerToRepair[i] != null)
+ closeOpenMarker(i, merged.closeBound(isReversed));
+
+ }
+
+ if (merged.isOpen(isReversed))
+ {
+ // If we're opening a new merged range (or just switching deletion), then unless the source
+ // is up to date on that deletion (note that we've updated what the source deleteion is
+ // above), we'll have to sent the range to the source.
+ DeletionTime newDeletion = merged.openDeletionTime(isReversed);
+ DeletionTime sourceDeletion = sourceDeletionTime[i];
+ if (!newDeletion.equals(sourceDeletion))
+ markerToRepair[i] = merged.openBound(isReversed);
+ }
+ }
+ }
+
+ if (merged != null)
+ mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null;
+ }
+
+ private void closeOpenMarker(int i, ClusteringBound close)
+ {
+ ClusteringBound open = markerToRepair[i];
+ update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
+ markerToRepair[i] = null;
+ }
+
+ public void close()
+ {
+ RepairListener.PartitionRepair repair = null;
+ for (int i = 0; i < repairs.length; i++)
+ {
+ if (repairs[i] == null)
+ continue;
+
+ if (repair == null)
+ {
+ repair = repairListener.startPartitionRepair();
+ }
+ repair.reportMutation(sources[i], new Mutation(repairs[i].build()));
+ }
+
+ if (repair != null)
+ {
+ repair.finish();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org