You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/09/14 09:14:51 UTC
[1/4] cassandra git commit: ReplicaPlan/Layout refactor
follow-up/completion
Repository: cassandra
Updated Branches:
refs/heads/trunk 05dbb3e0a -> 047bcd7ad
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
index cf1e06a..a3f13c2 100644
--- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Predicates;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.locator.EndpointsForToken;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlans;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -236,7 +236,7 @@ public class WriteResponseHandlerTest
private static AbstractWriteResponseHandler createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal, long queryStartTime)
{
- return ks.getReplicationStrategy().getWriteResponseHandler(ReplicaLayout.forWriteWithDownNodes(ks, cl, targets.token(), targets, pending),
+ return ks.getReplicationStrategy().getWriteResponseHandler(ReplicaPlans.forWrite(ks, cl, targets, pending, Predicates.alwaysTrue(), ReplicaPlans.writeAll),
null, WriteType.SIMPLE, queryStartTime, ideal);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
index c19e65e..b6c95dd 100644
--- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
@@ -24,7 +24,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
-import com.google.common.base.Predicates;
import com.google.common.collect.Sets;
import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -32,10 +31,10 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.EndpointsForRange;
-import org.apache.cassandra.locator.ReplicaUtils;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
@@ -54,7 +53,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.locator.Replica.fullReplica;
import static org.apache.cassandra.locator.Replica.transientReplica;
-import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE;
import static org.apache.cassandra.locator.ReplicaUtils.full;
import static org.apache.cassandra.locator.ReplicaUtils.trans;
@@ -147,32 +145,35 @@ public class WriteResponseHandlerTransientTest
dummy = DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0));
}
- @Ignore("Throws unavailable for quorum as written")
@Test
public void checkPendingReplicasAreNotFiltered()
{
- EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), full(EP1), full(EP2), trans(EP3));
- EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), full(EP4), full(EP5), trans(EP6));
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), 2, natural, pending, Predicates.alwaysTrue());
+ EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), full(EP1), full(EP2), trans(EP3), full(EP5));
+ EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), full(EP4), trans(EP6));
+ ReplicaLayout.ForTokenWrite layout = new ReplicaLayout.ForTokenWrite(natural, pending);
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(ks, ConsistencyLevel.QUORUM, layout, layout, ReplicaPlans.writeAll);
- Assert.assertEquals(EndpointsForRange.of(full(EP4), full(EP5), trans(EP6)), replicaLayout.pending());
+ Assert.assertEquals(EndpointsForRange.of(full(EP4), trans(EP6)), replicaPlan.pending());
}
- private static ReplicaLayout.ForToken expected(EndpointsForToken all, EndpointsForToken selected)
+ private static ReplicaPlan.ForTokenWrite expected(EndpointsForToken natural, EndpointsForToken selected)
{
- return new ReplicaLayout.ForToken(ks, ConsistencyLevel.QUORUM, dummy.getToken(), all, EndpointsForToken.empty(dummy.getToken()), selected);
+ return new ReplicaPlan.ForTokenWrite(ks, ConsistencyLevel.QUORUM, EndpointsForToken.empty(dummy.getToken()), natural, natural, selected);
}
- private static ReplicaLayout.ForToken getSpeculationContext(EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate)
+ private static ReplicaPlan.ForTokenWrite getSpeculationContext(EndpointsForToken natural, Predicate<InetAddressAndPort> livePredicate)
{
- return ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), blockFor, replicas, EndpointsForToken.empty(dummy.getToken()), livePredicate);
+ ReplicaLayout.ForTokenWrite liveAndDown = new ReplicaLayout.ForTokenWrite(natural, EndpointsForToken.empty(dummy.getToken()));
+ ReplicaLayout.ForTokenWrite live = new ReplicaLayout.ForTokenWrite(natural.filter(r -> livePredicate.test(r.endpoint())), EndpointsForToken.empty(dummy.getToken()));
+ return ReplicaPlans.forWrite(ks, ConsistencyLevel.QUORUM, liveAndDown, live, ReplicaPlans.writeNormal);
}
- private static void assertSpeculationReplicas(ReplicaLayout.ForToken expected, EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate)
+ private static void assertSpeculationReplicas(ReplicaPlan.ForTokenWrite expected, EndpointsForToken replicas, Predicate<InetAddressAndPort> livePredicate)
{
- ReplicaLayout.ForToken actual = getSpeculationContext(replicas, blockFor, livePredicate);
- Assert.assertEquals(expected.natural(), actual.natural());
- Assert.assertEquals(expected.selected(), actual.selected());
+ ReplicaPlan.ForTokenWrite actual = getSpeculationContext(replicas, livePredicate);
+ Assert.assertEquals(expected.pending(), actual.pending());
+ Assert.assertEquals(expected.live(), actual.live());
+ Assert.assertEquals(expected.contacts(), actual.contacts());
}
private static Predicate<InetAddressAndPort> dead(InetAddressAndPort... endpoints)
@@ -186,39 +187,35 @@ public class WriteResponseHandlerTransientTest
return EndpointsForToken.of(dummy.getToken(), rr);
}
- @Ignore("Throws unavailable for quorum as written")
@Test
public void checkSpeculationContext()
{
- EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3));
+ EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3), full(EP4), full(EP5), trans(EP6));
// in happy path, transient replica should be classified as a backup
- assertSpeculationReplicas(expected(all,
- replicas(full(EP1), full(EP2))),
- replicas(full(EP1), full(EP2), trans(EP3)),
- 2, dead());
-
- // if one of the full replicas is dead, they should all be in the initial contacts
- assertSpeculationReplicas(expected(all,
- replicas(full(EP1), trans(EP3))),
- replicas(full(EP1), full(EP2), trans(EP3)),
- 2, dead(EP2));
-
- // block only for 1 full replica, use transient as backups
- assertSpeculationReplicas(expected(all,
- replicas(full(EP1))),
- replicas(full(EP1), full(EP2), trans(EP3)),
- 1, dead(EP2));
+ assertSpeculationReplicas(expected(all, replicas(full(EP1), full(EP2), full(EP4), full(EP5))),
+ all,
+ dead());
+
+ // full replicas must always be in the contact list, and will occur first
+ assertSpeculationReplicas(expected(replicas(full(EP1), trans(EP3), full(EP4), trans(EP6)), replicas(full(EP1), full(EP2), full(EP4), full(EP5), trans(EP3), trans(EP6))),
+ all,
+ dead(EP2, EP5));
+
+ // only one transient used as backup
+ assertSpeculationReplicas(expected(replicas(full(EP1), trans(EP3), full(EP4), full(EP5), trans(EP6)), replicas(full(EP1), full(EP2), full(EP4), full(EP5), trans(EP3))),
+ all,
+ dead(EP2));
}
@Test (expected = UnavailableException.class)
public void noFullReplicas()
{
- getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP1));
+ getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), dead(EP1));
}
@Test (expected = UnavailableException.class)
public void notEnoughTransientReplicas()
{
- getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP2, EP3));
+ getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), dead(EP2, EP3));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 968ef16..c49bf3a 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -87,7 +88,7 @@ public class DataResolverTest extends AbstractReadResponseTest
{
command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
command.trackRepairedStatus();
- readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
+ readRepair = new TestableReadRepair(command);
}
private static EndpointsForRange makeReplicas(int num)
@@ -338,7 +339,7 @@ public class DataResolverTest extends AbstractReadResponseTest
public void testResolveWithBothEmpty()
{
EndpointsForRange replicas = makeReplicas(2);
- TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
+ TestableReadRepair readRepair = new TestableReadRepair(command);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
resolver.preprocess(response(command, replicas.get(0).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
resolver.preprocess(response(command, replicas.get(1).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
@@ -715,7 +716,7 @@ public class DataResolverTest extends AbstractReadResponseTest
{
EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
- TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
+ TestableReadRepair readRepair = new TestableReadRepair(cmd);
DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
@@ -767,7 +768,7 @@ public class DataResolverTest extends AbstractReadResponseTest
{
EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
- TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
+ TestableReadRepair readRepair = new TestableReadRepair(cmd);
DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
@@ -811,7 +812,7 @@ public class DataResolverTest extends AbstractReadResponseTest
{
EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
- TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
+ TestableReadRepair readRepair = new TestableReadRepair(cmd);
DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
@@ -861,7 +862,7 @@ public class DataResolverTest extends AbstractReadResponseTest
{
EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
- TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
+ TestableReadRepair readRepair = new TestableReadRepair(cmd);
DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
@@ -1224,7 +1225,7 @@ public class DataResolverTest extends AbstractReadResponseTest
}
private DataResolver resolverWithVerifier(final ReadCommand command,
- final ReplicaLayout.ForRange plan,
+ final ReplicaPlan.SharedForRangeRead plan,
final ReadRepair readRepair,
final long queryStartNanoTime,
final RepairedDataVerifier verifier)
@@ -1232,7 +1233,7 @@ public class DataResolverTest extends AbstractReadResponseTest
class TestableDataResolver extends DataResolver
{
- public TestableDataResolver(ReadCommand command, ReplicaLayout.ForRange plan, ReadRepair readRepair, long queryStartNanoTime)
+ public TestableDataResolver(ReadCommand command, ReplicaPlan.SharedForRangeRead plan, ReadRepair readRepair, long queryStartNanoTime)
{
super(command, plan, readRepair, queryStartNanoTime);
}
@@ -1298,9 +1299,9 @@ public class DataResolverTest extends AbstractReadResponseTest
assertEquals(update.metadata().name, cfm.name);
}
- private ReplicaLayout.ForRange plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel)
+ private ReplicaPlan.SharedForRangeRead plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel)
{
- return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas);
+ return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas));
}
private static void resolveAndConsume(DataResolver resolver)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
new file mode 100644
index 0000000..456cec4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.SimpleBuilders;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.reads.repair.TestableReadRepair;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.apache.cassandra.locator.ReplicaUtils.trans;
+
+/**
+ * Tests DataResolvers handing of transient replicas
+ */
+public class DataResolverTransientTest extends AbstractReadResponseTest
+{
+ private static DecoratedKey key;
+
+ @Before
+ public void setUp()
+ {
+ key = Util.dk("key1");
+ }
+
+ private static PartitionUpdate.Builder update(TableMetadata metadata, String key, Row... rows)
+ {
+ PartitionUpdate.Builder builder = new PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), rows.length, false);
+ for (Row row: rows)
+ {
+ builder.add(row);
+ }
+ return builder;
+ }
+
+ private static PartitionUpdate.Builder update(Row... rows)
+ {
+ return update(cfm, "key1", rows);
+ }
+
+ private static Row.SimpleBuilder rowBuilder(int clustering)
+ {
+ return new SimpleBuilders.RowBuilder(cfm, Integer.toString(clustering));
+ }
+
+ private static Row row(long timestamp, int clustering, int value)
+ {
+ return rowBuilder(clustering).timestamp(timestamp).add("c1", Integer.toString(value)).build();
+ }
+
+ private static DeletionTime deletion(long timeMillis)
+ {
+ TimeUnit MILLIS = TimeUnit.MILLISECONDS;
+ return new DeletionTime(MILLIS.toMicros(timeMillis), Ints.checkedCast(MILLIS.toSeconds(timeMillis)));
+ }
+
+ /**
+ * Tests that the given update doesn't cause data resolver to attempt to repair a transient replica
+ */
+ private void assertNoTransientRepairs(PartitionUpdate update)
+ {
+ SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(update.metadata(), nowInSec, key);
+ EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+ TestableReadRepair repair = new TestableReadRepair(command);
+ DataResolver resolver = new DataResolver(command, plan(targetReplicas, ConsistencyLevel.QUORUM), repair, 0);
+
+ Assert.assertFalse(resolver.isDataPresent());
+ resolver.preprocess(response(command, EP1, iter(update), false));
+ resolver.preprocess(response(command, EP2, iter(update), false));
+ resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(update.metadata()), false));
+
+ Assert.assertFalse(repair.dataWasConsumed());
+ assertPartitionsEqual(filter(iter(update)), resolver.resolve());
+ Assert.assertTrue(repair.dataWasConsumed());
+ Assert.assertTrue(repair.sent.toString(), repair.sent.isEmpty());
+ }
+
+ @Test
+ public void emptyRowRepair()
+ {
+ assertNoTransientRepairs(update(row(1000, 4, 4), row(1000, 5, 5)).build());
+ }
+
+ @Test
+ public void emptyPartitionDeletionRepairs()
+ {
+ PartitionUpdate.Builder builder = update();
+ builder.addPartitionDeletion(deletion(1999));
+ assertNoTransientRepairs(builder.build());
+ }
+
+ /**
+ * Partition level deletion responses shouldn't sent data to a transient replica
+ */
+ @Test
+ public void emptyRowDeletionRepairs()
+ {
+ PartitionUpdate.Builder builder = update();
+ builder.add(rowBuilder(1).timestamp(1999).delete().build());
+ assertNoTransientRepairs(builder.build());
+ }
+
+ @Test
+ public void emptyComplexDeletionRepair()
+ {
+
+ long[] ts = {1000, 2000};
+
+ Row.Builder builder = BTreeRow.unsortedBuilder();
+ builder.newRow(Clustering.EMPTY);
+ builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+ assertNoTransientRepairs(update(cfm2, "key", builder.build()).build());
+
+ }
+
+ @Test
+ public void emptyRangeTombstoneRepairs()
+ {
+ Slice slice = Slice.make(Clustering.make(ByteBufferUtil.bytes("a")), Clustering.make(ByteBufferUtil.bytes("b")));
+ PartitionUpdate.Builder builder = update();
+ builder.add(new RangeTombstone(slice, deletion(2000)));
+ assertNoTransientRepairs(builder.build());
+ }
+
+ /**
+ * If the full replicas need to repair each other, repairs shouldn't be sent to transient replicas
+ */
+ @Test
+ public void fullRepairsIgnoreTransientReplicas()
+ {
+ SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
+ EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+ TestableReadRepair repair = new TestableReadRepair(command);
+ DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0);
+
+ Assert.assertFalse(resolver.isDataPresent());
+ resolver.preprocess(response(command, EP1, iter(update(row(1000, 5, 5)).build()), false));
+ resolver.preprocess(response(command, EP2, iter(update(row(2000, 4, 4)).build()), false));
+ resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(cfm), false));
+
+ Assert.assertFalse(repair.dataWasConsumed());
+
+ consume(resolver.resolve());
+
+ Assert.assertTrue(repair.dataWasConsumed());
+
+ Assert.assertTrue(repair.sent.containsKey(EP1));
+ Assert.assertTrue(repair.sent.containsKey(EP2));
+ Assert.assertFalse(repair.sent.containsKey(EP3));
+ }
+
+ /**
+ * If the transient replica has new data, the full replicas shoould be repaired, the transient one should not
+ */
+ @Test
+ public void transientMismatchesRepairFullReplicas()
+ {
+ SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
+ EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+ TestableReadRepair<?, ?> repair = new TestableReadRepair(command);
+ DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0);
+
+ Assert.assertFalse(resolver.isDataPresent());
+ PartitionUpdate transData = update(row(1000, 5, 5)).build();
+ resolver.preprocess(response(command, EP1, EmptyIterators.unfilteredPartition(cfm), false));
+ resolver.preprocess(response(command, EP2, EmptyIterators.unfilteredPartition(cfm), false));
+ resolver.preprocess(response(command, EP3, iter(transData), false));
+
+ Assert.assertFalse(repair.dataWasConsumed());
+
+ assertPartitionsEqual(filter(iter(transData)), resolver.resolve());
+
+ Assert.assertTrue(repair.dataWasConsumed());
+
+ assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP1).getPartitionUpdate(cfm))));
+ assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP2).getPartitionUpdate(cfm))));
+ Assert.assertFalse(repair.sent.containsKey(EP3));
+
+ }
+
+ private ReplicaPlan.SharedForTokenRead plan(EndpointsForToken replicas, ConsistencyLevel consistencyLevel)
+ {
+ return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, consistencyLevel, replicas, replicas));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 8454d6a..99101f1 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.service.reads;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
import org.junit.Test;
@@ -158,8 +159,8 @@ public class DigestResolverTest extends AbstractReadResponseTest
resolver.getData());
}
- private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas)
+ private ReplicaPlan.SharedForTokenRead plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas)
{
- return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas);
+ return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, consistencyLevel, replicas, replicas));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 3b102f2..34be5ee 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -224,13 +225,13 @@ public class ReadExecutorTest
}
- private ReplicaLayout.ForToken plan(EndpointsForToken targets, ConsistencyLevel consistencyLevel)
+ private ReplicaPlan.ForTokenRead plan(EndpointsForToken targets, ConsistencyLevel consistencyLevel)
{
return plan(consistencyLevel, targets, targets);
}
- private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken selected)
+ private ReplicaPlan.ForTokenRead plan(ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken selected)
{
- return new ReplicaLayout.ForToken(ks, consistencyLevel, natural.token(), natural, null, selected);
+ return new ReplicaPlan.ForTokenRead(ks, consistencyLevel, natural, selected);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 7e6ee29..5115581 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -10,6 +10,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@@ -70,7 +71,7 @@ public abstract class AbstractReadRepairTest
static Replica replica2;
static Replica replica3;
static EndpointsForRange replicas;
- static ReplicaLayout<?, ?> replicaLayout;
+ static ReplicaPlan.ForRead<?> replicaPlan;
static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
static DecoratedKey key;
@@ -217,7 +218,7 @@ public abstract class AbstractReadRepairTest
replica3 = fullReplica(target3, FULL_RANGE);
replicas = EndpointsForRange.of(replica1, replica2, replica3);
- replicaLayout = replicaLayout(ConsistencyLevel.QUORUM, replicas);
+ replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas);
// default test values
key = dk(5);
@@ -245,21 +246,30 @@ public abstract class AbstractReadRepairTest
cfs.transientWriteLatencyNanos = 0;
}
- static ReplicaLayout.ForRange replicaLayout(EndpointsForRange replicas, EndpointsForRange targets)
+ static ReplicaPlan.ForRangeRead replicaPlan(ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
{
- return new ReplicaLayout.ForRange(ks, ConsistencyLevel.QUORUM, ReplicaUtils.FULL_BOUNDS, replicas, targets);
+ return replicaPlan(ks, consistencyLevel, replicas, replicas);
}
- static ReplicaLayout.ForRange replicaLayout(ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
+ static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets)
{
- return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas);
+ return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets);
+ }
+ static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
+ {
+ return replicaPlan(keyspace, consistencyLevel, replicas, replicas);
+ }
+ static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas, EndpointsForRange targets)
+ {
+ return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel,
+ ReplicaUtils.FULL_BOUNDS, replicas, targets);
}
- public abstract InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime);
+ public abstract InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime);
- public InstrumentedReadRepair createInstrumentedReadRepair(ReplicaLayout<?, ?> replicaLayout)
+ public InstrumentedReadRepair createInstrumentedReadRepair(ReplicaPlan.Shared<?, ?> replicaPlan)
{
- return createInstrumentedReadRepair(command, replicaLayout, System.nanoTime());
+ return createInstrumentedReadRepair(command, replicaPlan, System.nanoTime());
}
@@ -270,7 +280,7 @@ public abstract class AbstractReadRepairTest
@Test
public void readSpeculationCycle()
{
- InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout(replicas, EndpointsForRange.of(replica1, replica2)));
+ InstrumentedReadRepair repair = createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, EndpointsForRange.of(replica1, replica2))));
ResultConsumer consumer = new ResultConsumer();
Assert.assertEquals(epSet(), repair.getReadRecipients());
@@ -289,7 +299,7 @@ public abstract class AbstractReadRepairTest
@Test
public void noSpeculationRequired()
{
- InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout(replicas, EndpointsForRange.of(replica1, replica2)));
+ InstrumentedReadRepair repair = createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, EndpointsForRange.of(replica1, replica2))));
ResultConsumer consumer = new ResultConsumer();
Assert.assertEquals(epSet(), repair.getReadRecipients());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index a4b7615..6bb1b7a 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -44,11 +45,12 @@ import org.apache.cassandra.service.reads.ReadCallback;
public class BlockingReadRepairTest extends AbstractReadRepairTest
{
- private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L>
+ private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends BlockingPartitionRepair<E, P>
{
- public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
+ public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
{
- super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaLayout);
+ super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaPlan);
}
Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -71,22 +73,24 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
configureClass(ReadRepairStrategy.BLOCKING);
}
- private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaLayout<?, ?> replicaLayout)
+ private static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ InstrumentedReadRepairHandler<E, P> createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
{
- return new InstrumentedReadRepairHandler(repairs, maxBlockFor, replicaLayout);
+ return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan);
}
private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
{
EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
- return createRepairHandler(repairs, maxBlockFor, replicaLayout(replicas, replicas));
+ return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas));
}
- private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingReadRepair<E, L> implements InstrumentedReadRepair<E, L>
+ private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends BlockingReadRepair<E, P> implements InstrumentedReadRepair<E, P>
{
- public InstrumentedBlockingReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ public InstrumentedBlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- super(command, replicaLayout, queryStartNanoTime);
+ super(command, replicaPlan, queryStartNanoTime);
}
Set<InetAddressAndPort> readCommandRecipients = new HashSet<>();
@@ -114,9 +118,9 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
}
@Override
- public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+ public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime)
{
- return new InstrumentedBlockingReadRepair(command, replicaLayout, queryStartNanoTime);
+ return new InstrumentedBlockingReadRepair(command, replicaPlan, queryStartNanoTime);
}
@Test
@@ -142,8 +146,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(replica1, repair1);
repairs.put(replica2, repair2);
- ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
- InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaLayout);
+ ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+ InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaPlan);
Assert.assertTrue(handler.mutationsSent.isEmpty());
@@ -221,7 +225,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(replica1, repair1);
// check that the correct initial mutations are sent out
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout(replicas, EndpointsForRange.of(replica1, replica2)));
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan(replicas, EndpointsForRange.of(replica1, replica2)));
handler.sendInitialRepairs();
Assert.assertEquals(1, handler.mutationsSent.size());
Assert.assertTrue(handler.mutationsSent.containsKey(target1));
@@ -269,8 +273,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(remote1, mutation(cell1));
EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remote1, remote2);
- ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, participants, participants);
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout);
+ ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants);
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan);
handler.sendInitialRepairs();
Assert.assertEquals(2, handler.mutationsSent.size());
Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index a5efe27..c64a73b 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -80,8 +81,8 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(replica2, repair2);
- ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
- DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout);
+ ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+ DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan);
Assert.assertTrue(handler.updatesByEp.isEmpty());
@@ -106,20 +107,20 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
}
- public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+ public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime)
{
- return new DiagnosticBlockingRepairHandler(command, replicaLayout, queryStartNanoTime);
+ return new DiagnosticBlockingRepairHandler(command, replicaPlan, queryStartNanoTime);
}
- private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaLayout<?, ?> replicaLayout)
+ private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaPlan.ForRead<?> replicaPlan)
{
- return new DiagnosticPartitionReadRepairHandler(key, repairs, maxBlockFor, replicaLayout);
+ return new DiagnosticPartitionReadRepairHandler<>(key, repairs, maxBlockFor, replicaPlan);
}
private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
{
EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
- return createRepairHandler(repairs, maxBlockFor, replicaLayout(replicas, replicas));
+ return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas));
}
private static class DiagnosticBlockingRepairHandler extends BlockingReadRepair implements InstrumentedReadRepair
@@ -127,9 +128,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
private Set<InetAddressAndPort> recipients = Collections.emptySet();
private ReadCallback readCallback = null;
- DiagnosticBlockingRepairHandler(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+ DiagnosticBlockingRepairHandler(ReadCommand command, ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime)
{
- super(command, replicaLayout, queryStartNanoTime);
+ super(command, replicaPlan, queryStartNanoTime);
DiagnosticEventService.instance().subscribe(ReadRepairEvent.class, this::onRepairEvent);
}
@@ -163,13 +164,14 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
}
}
- private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L>
+ private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends BlockingPartitionRepair<E, P>
{
private final Map<InetAddressAndPort, String> updatesByEp = new HashMap<>();
- DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
+ DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
{
- super(key, repairs, maxBlockFor, replicaLayout);
+ super(key, repairs, maxBlockFor, replicaPlan);
DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
index f3d2866..81ab07e 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
@@ -23,9 +23,11 @@ import java.util.Set;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.reads.ReadCallback;
-public interface InstrumentedReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ReadRepair<E, L>
+public interface InstrumentedReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends ReadRepair<E, P>
{
Set<InetAddressAndPort> getReadRecipients();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index bee5ddd..cf12265 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -36,11 +37,12 @@ import org.apache.cassandra.service.reads.ReadCallback;
public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
{
- private static class InstrumentedReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ReadOnlyReadRepair implements InstrumentedReadRepair
+ private static class InstrumentedReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends ReadOnlyReadRepair implements InstrumentedReadRepair
{
- public InstrumentedReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ public InstrumentedReadOnlyReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- super(command, replicaLayout, queryStartNanoTime);
+ super(command, replicaPlan, queryStartNanoTime);
}
Set<InetAddressAndPort> readCommandRecipients = new HashSet<>();
@@ -74,24 +76,24 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
}
@Override
- public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+ public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime)
{
- return new InstrumentedReadOnlyReadRepair(command, replicaLayout, queryStartNanoTime);
+ return new InstrumentedReadOnlyReadRepair(command, replicaPlan, queryStartNanoTime);
}
@Test
public void getMergeListener()
{
- ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas);
- InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout);
- Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, repair.getMergeListener(replicaLayout));
+ ReplicaPlan.SharedForRangeRead replicaPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas));
+ InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaPlan);
+ Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, repair.getMergeListener(replicaPlan.get()));
}
@Test(expected = UnsupportedOperationException.class)
public void repairPartitionFailure()
{
- ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas);
- InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout);
- repair.repairPartition(null, Collections.emptyMap(), replicaLayout);
+ ReplicaPlan.SharedForRangeRead replicaPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas));
+ InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaPlan);
+ repair.repairPartition(null, Collections.emptyMap(), replicaPlan.get());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index e4ba25d..b678b4d 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import org.apache.cassandra.Util;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -70,11 +71,12 @@ public class ReadRepairTest
static Replica target3;
static EndpointsForRange targets;
- private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L>
+ private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends BlockingPartitionRepair<E, P>
{
- public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
+ public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
{
- super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaLayout);
+ super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaPlan);
}
Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -166,8 +168,8 @@ public class ReadRepairTest
private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, EndpointsForRange all, EndpointsForRange targets)
{
- ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, all, targets);
- return new InstrumentedReadRepairHandler(repairs, maxBlockFor, replicaLayout);
+ ReplicaPlan.ForRangeRead replicaPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets);
+ return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan);
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index 2a2dec2..53964cb 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -36,28 +36,28 @@ import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.reads.DigestResolver;
-public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
+public class TestableReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ implements ReadRepair<E, P>
{
public final Map<InetAddressAndPort, Mutation> sent = new HashMap<>();
private final ReadCommand command;
- private final ConsistencyLevel consistency;
private boolean partitionListenerClosed = false;
private boolean rowListenerClosed = true;
- public TestableReadRepair(ReadCommand command, ConsistencyLevel consistency)
+ public TestableReadRepair(ReadCommand command)
{
this.command = command;
- this.consistency = consistency;
}
@Override
- public UnfilteredPartitionIterators.MergeListener getMergeListener(L endpoints)
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(P endpoints)
{
- return new PartitionIteratorMergeListener(endpoints, command, consistency, this) {
+ return new PartitionIteratorMergeListener<E>(endpoints, command, this) {
@Override
public void close()
{
@@ -70,7 +70,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
{
assert rowListenerClosed;
rowListenerClosed = false;
- return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), endpoints, command, consistency, TestableReadRepair.this) {
+ return new RowIteratorMergeListener<E>(partitionKey, columns(versions), isReversed(versions), endpoints, command, TestableReadRepair.this) {
@Override
public void close()
{
@@ -83,7 +83,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
}
@Override
- public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
+ public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer)
{
}
@@ -113,7 +113,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
{
for (Map.Entry<Replica, Mutation> entry: mutations.entrySet())
sent.put(entry.getKey().endpoint(), entry.getValue());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/4] cassandra git commit: ReplicaPlan/Layout refactor
follow-up/completion
Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
new file mode 100644
index 0000000..25f42c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.limit;
+
+public class ReplicaPlans
+{
+
+ /**
+ * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. This node is *assumed* to be alive.
+ */
+ public static ReplicaPlan.ForTokenWrite forSingleReplicaWrite(Keyspace keyspace, Token token, Replica replica)
+ {
+ EndpointsForToken one = EndpointsForToken.of(token, replica);
+ EndpointsForToken empty = EndpointsForToken.empty(token);
+ return new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE, empty, one, one, one);
+ }
+
+ /**
+ * A forwarding counter write is always sent to a single owning coordinator for the range, by the original coordinator
+ * (if it is not itself an owner)
+ */
+ public static ReplicaPlan.ForTokenWrite forForwardingCounterWrite(Keyspace keyspace, Token token, Replica replica)
+ {
+ return forSingleReplicaWrite(keyspace, token, replica);
+ }
+
+ /**
+ * Requires that the provided endpoints are alive. Converts them to their relevant system replicas.
+ * Note that the liveAndDown collection and live are equal to the provided endpoints.
+ *
+ * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO.
+ * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear.
+ */
+ public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+ {
+ // A single case we write not for range or token, but multiple mutations to many tokens
+ Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+
+ ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+ SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+ EndpointsForToken.empty(token)
+ );
+ ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+ // assume that we have already been given live endpoints, and skip applying the failure detector
+ return forWrite(keyspace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
+ }
+
+ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException
+ {
+ return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector);
+ }
+
+ @VisibleForTesting
+ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken pending, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+ {
+ return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWrite(natural, pending), isAlive, selector);
+ }
+
+ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Selector selector) throws UnavailableException
+ {
+ return forWrite(keyspace, consistencyLevel, liveAndDown, FailureDetector.isReplicaAlive, selector);
+ }
+
+ @VisibleForTesting
+ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+ {
+ ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive);
+ return forWrite(keyspace, consistencyLevel, liveAndDown, live, selector);
+ }
+
+ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
+ {
+ EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live);
+ ReplicaPlan.ForTokenWrite result = new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
+ result.assureSufficientReplicas();
+ return result;
+ }
+
+ public interface Selector
+ {
+ <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live);
+ }
+
+ /**
+ * Select all nodes, transient or otherwise, as targets for the operation.
+ *
+ * This is may no longer be useful until we finish implementing transient replication support, however
+ * it can be of value to stipulate that a location writes to all nodes without regard to transient status.
+ */
+ public static final Selector writeAll = new Selector()
+ {
+ @Override
+ public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+ {
+ return liveAndDown.all();
+ }
+ };
+
+ /**
+ * Select all full nodes, live or down, as write targets. If there are insufficient nodes to complete the write,
+ * but there are live transient nodes, select a sufficient number of these to reach our consistency level.
+ *
+ * Pending nodes are always contacted, whether or not they are full. When a transient replica is undergoing
+ * a pending move to a new node, if we write (transiently) to it, this write would not be replicated to the
+ * pending transient node, and so when completing the move, the write could effectively have not reached the
+ * promised consistency level.
+ */
+ public static final Selector writeNormal = new Selector()
+ {
+ @Override
+ public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+ {
+ if (!any(liveAndDown.all(), Replica::isTransient))
+ return liveAndDown.all();
+
+ assert consistencyLevel != ConsistencyLevel.EACH_QUORUM;
+
+ ReplicaCollection.Mutable<E> contacts = liveAndDown.all().newMutable(liveAndDown.all().size());
+ contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
+ contacts.addAll(liveAndDown.pending());
+
+ // TODO: this doesn't correctly handle LOCAL_QUORUM (or EACH_QUORUM at all)
+ int liveCount = contacts.count(live.all()::contains);
+ int requiredTransientCount = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount;
+ if (requiredTransientCount > 0)
+ contacts.addAll(limit(filter(live.natural(), Replica::isTransient), requiredTransientCount));
+ return contacts.asSnapshot();
+ }
+ };
+
+ /**
+ * Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison,
+ * but for the paxos linearisation agreement.
+ *
+ * This will select all live nodes as the candidates for the operation. Only the required number of participants
+ */
+ public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
+ {
+ Token tk = key.getToken();
+ ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
+
+ Replicas.temporaryAssertFull(liveAndDown.all()); // TODO CASSANDRA-14547
+
+ if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
+ {
+ // TODO: we should cleanup our semantics here, as we're filtering ALL nodes to localDC which is unexpected for ReplicaPlan
+ // Restrict natural and pending to node in the local DC only
+ String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica));
+
+ liveAndDown = liveAndDown.filter(isLocalDc);
+ }
+
+ ReplicaLayout.ForTokenWrite live = liveAndDown.filter(FailureDetector.isReplicaAlive);
+
+ // TODO: this should use assureSufficientReplicas
+ int participants = liveAndDown.all().size();
+ int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
+
+ EndpointsForToken contacts = live.all();
+ if (contacts.size() < requiredParticipants)
+ throw UnavailableException.create(consistencyForPaxos, requiredParticipants, contacts.size());
+
+ // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
+ // Note that we fake an impossible number of required nodes in the unavailable exception
+ // to nail home the point that it's an impossible operation no matter how many nodes are live.
+ if (liveAndDown.pending().size() > 1)
+ throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", liveAndDown.all().size()),
+ consistencyForPaxos,
+ participants + 1,
+ contacts.size());
+
+ return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, requiredParticipants);
+ }
+
+ /**
+ * Construct a plan for reading from a single node - this permits no speculation or read-repair
+ */
+ public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace keyspace, Token token, Replica replica)
+ {
+ EndpointsForToken one = EndpointsForToken.of(token, replica);
+ return new ReplicaPlan.ForTokenRead(keyspace, ConsistencyLevel.ONE, one, one);
+ }
+
+ /**
+ * Construct a plan for reading from a single node - this permits no speculation or read-repair
+ */
+ public static ReplicaPlan.ForRangeRead forSingleReplicaRead(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
+ {
+ // TODO: this is unsafe, as one.range() may be inconsistent with our supplied range; should refactor Range/AbstractBounds to single class
+ EndpointsForRange one = EndpointsForRange.of(replica);
+ return new ReplicaPlan.ForRangeRead(keyspace, ConsistencyLevel.ONE, range, one, one);
+ }
+
+ /**
+ * Construct a plan for reading the provided token at the provided consistency level. This translates to a collection of
+ * - candidates who are: alive, replicate the token, and are sorted by their snitch scores
+ * - contacts who are: the first blockFor + (retry == ALWAYS ? 1 : 0) candidates
+ *
+ * The candidate collection can be used for speculation, although at present it would break
+ * LOCAL_QUORUM and EACH_QUORUM to do so without further filtering
+ */
+ public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+ {
+ ReplicaLayout.ForTokenRead candidates = ReplicaLayout.forTokenReadLiveSorted(keyspace, token);
+ EndpointsForToken contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural(),
+ retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
+
+ ReplicaPlan.ForTokenRead result = new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, candidates.natural(), contacts);
+ result.assureSufficientReplicas(); // Throw UAE early if we don't have enough replicas.
+ return result;
+ }
+
+ /**
+ * Construct a plan for reading the provided range at the provided consistency level. This translates to a collection of
+ * - candidates who are: alive, replicate the range, and are sorted by their snitch scores
+ * - contacts who are: the first blockFor candidates
+ *
+ * There is no speculation for range read queries at present, so we never 'always speculate' here, and a failed response fails the query.
+ */
+ public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range)
+ {
+ ReplicaLayout.ForRangeRead candidates = ReplicaLayout.forRangeReadLiveSorted(keyspace, range);
+ EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural());
+
+ ReplicaPlan.ForRangeRead result = new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, candidates.natural(), contacts);
+ result.assureSufficientReplicas();
+ return result;
+ }
+
+ /**
+ * Take two range read plans for adjacent ranges, and check if it is OK (and worthwhile) to combine them into a single plan
+ */
+ public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaPlan.ForRangeRead left, ReplicaPlan.ForRangeRead right)
+ {
+ // TODO: should we be asserting that the ranges are adjacent?
+ AbstractBounds<PartitionPosition> newRange = left.range().withNewRight(right.range().right);
+ EndpointsForRange mergedCandidates = left.candidates().keep(right.candidates().endpoints());
+
+ // Check if there are enough shared endpoints for the merge to be possible.
+ if (!consistencyLevel.isSufficientLiveReplicasForRead(keyspace, mergedCandidates))
+ return null;
+
+ EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, mergedCandidates);
+
+ // Estimate whether merging will be a win or not
+ if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, left.contacts(), right.contacts()))
+ return null;
+
+ // If we get there, merge this range and the next one
+ return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, newRange, mergedCandidates, contacts);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/SystemReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java
index 13a9d74..0d1fc8d 100644
--- a/src/java/org/apache/cassandra/locator/SystemReplicas.java
+++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java
@@ -18,12 +18,11 @@
package org.apache.cassandra.locator;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Collections2;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -50,13 +49,8 @@ public class SystemReplicas
return systemReplicas.computeIfAbsent(endpoint, SystemReplicas::createSystemReplica);
}
- public static Collection<Replica> getSystemReplicas(Collection<InetAddressAndPort> endpoints)
+ public static EndpointsForRange getSystemReplicas(Collection<InetAddressAndPort> endpoints)
{
- List<Replica> replicas = new ArrayList<>(endpoints.size());
- for (InetAddressAndPort endpoint: endpoints)
- {
- replicas.add(getSystemReplica(endpoint));
- }
- return replicas;
+ return EndpointsForRange.copyOf(Collections2.transform(endpoints, SystemReplicas::getSystemReplica));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 4ab34db..ad40d7b 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -1224,15 +1224,11 @@ public class TokenMetadata
/**
* @deprecated retained for benefit of old tests
*/
+ @Deprecated
public EndpointsForToken getWriteEndpoints(Token token, String keyspaceName, EndpointsForToken natural)
{
EndpointsForToken pending = pendingEndpointsForToken(token, keyspaceName);
- if (Endpoints.haveConflicts(natural, pending))
- {
- natural = Endpoints.resolveConflictsInNatural(natural, pending);
- pending = Endpoints.resolveConflictsInPending(natural, pending);
- }
- return Endpoints.concat(natural, pending);
+ return ReplicaLayout.forTokenWrite(natural, pending).all();
}
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 253b412..ceaf072 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -17,12 +17,6 @@
*/
package org.apache.cassandra.net;
-import com.google.common.base.Predicate;
-
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
-
/**
* implementors of IAsyncCallback need to make sure that any public methods
* are threadsafe with respect to response() being called from the message
@@ -31,10 +25,6 @@ import org.apache.cassandra.locator.Replica;
*/
public interface IAsyncCallback<T>
{
- final Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive;
-
- final Predicate<Replica> isReplicaAlive = replica -> isAlive.apply(replica.endpoint());
-
/**
* @param msg response received.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index e817cc8..7f51ae7 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -26,8 +26,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;
import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.exceptions.WriteFailureException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -53,7 +53,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
//Count down until all responses and expirations have occured before deciding whether the ideal CL was reached.
private AtomicInteger responsesAndExpirations;
private final SimpleCondition condition = new SimpleCondition();
- protected final ReplicaLayout.ForToken replicaLayout;
+ protected final ReplicaPlan.ForTokenWrite replicaPlan;
protected final Runnable callback;
protected final WriteType writeType;
@@ -76,12 +76,12 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
* @param callback A callback to be called when the write is successful.
* @param queryStartNanoTime
*/
- protected AbstractWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ protected AbstractWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = replicaPlan;
this.callback = callback;
this.writeType = writeType;
this.failureReasonByEndpoint = new ConcurrentHashMap<>();
@@ -104,19 +104,19 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
if (!success)
{
- int blockedFor = totalBlockFor();
+ int blockedFor = blockFor();
int acks = ackCount();
// It's pretty unlikely, but we can race between exiting await above and here, so
// that we could now have enough acks. In that case, we "lie" on the acks count to
// avoid sending confusing info to the user (see CASSANDRA-6491).
if (acks >= blockedFor)
acks = blockedFor - 1;
- throw new WriteTimeoutException(writeType, replicaLayout.consistencyLevel(), acks, blockedFor);
+ throw new WriteTimeoutException(writeType, replicaPlan.consistencyLevel(), acks, blockedFor);
}
- if (totalBlockFor() + failures > totalEndpoints())
+ if (blockFor() + failures > candidateReplicaCount())
{
- throw new WriteFailureException(replicaLayout.consistencyLevel(), ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint);
+ throw new WriteFailureException(replicaPlan.consistencyLevel(), ackCount(), blockFor(), writeType, failureReasonByEndpoint);
}
}
@@ -135,7 +135,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
public void setIdealCLResponseHandler(AbstractWriteResponseHandler handler)
{
this.idealCLDelegate = handler;
- idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaLayout.selected().size());
+ idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaPlan.contacts().size());
}
/**
@@ -189,28 +189,30 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
/**
* @return the minimum number of endpoints that must reply.
*/
- protected int totalBlockFor()
+ protected int blockFor()
{
// During bootstrap, we have to include the pending endpoints or we may fail the consistency level
// guarantees (see #833)
- return replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), replicaLayout.pending());
+ return replicaPlan.blockFor();
}
/**
+ * TODO: this method is brittle for its purpose of deciding when we should fail a query;
+ * this needs to be CL aware, and of which nodes are live/down
* @return the total number of endpoints the request can been sent to.
*/
- protected int totalEndpoints()
+ protected int candidateReplicaCount()
{
- return replicaLayout.all().size();
+ return replicaPlan.liveAndDown().size();
}
public ConsistencyLevel consistencyLevel()
{
- return replicaLayout.consistencyLevel();
+ return replicaPlan.consistencyLevel();
}
/**
- * @return true if the message counts towards the totalBlockFor() threshold
+ * @return true if the message counts towards the blockFor() threshold
*/
protected boolean waitingFor(InetAddressAndPort from)
{
@@ -227,11 +229,6 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
*/
public abstract void response(MessageIn<T> msg);
- public void assureSufficientLiveNodes() throws UnavailableException
- {
- replicaLayout.consistencyLevel().assureSufficientLiveNodesForWrite(replicaLayout.keyspace(), replicaLayout.all().filter(isReplicaAlive), replicaLayout.pending());
- }
-
protected void signal()
{
condition.signalAll();
@@ -250,7 +247,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
failureReasonByEndpoint.put(from, failureReason);
- if (totalBlockFor() + n > totalEndpoints())
+ if (blockFor() + n > candidateReplicaCount())
signal();
}
@@ -278,11 +275,11 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
//The condition being signaled is a valid proxy for the CL being achieved
if (!condition.isSignaled())
{
- replicaLayout.keyspace().metric.writeFailedIdealCL.inc();
+ replicaPlan.keyspace().metric.writeFailedIdealCL.inc();
}
else
{
- replicaLayout.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime);
+ replicaPlan.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime);
}
}
}
@@ -292,7 +289,8 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
*/
public void maybeTryAdditionalReplicas(IMutation mutation, StorageProxy.WritePerformer writePerformer, String localDC)
{
- if (replicaLayout.all().size() == replicaLayout.selected().size())
+ EndpointsForToken uncontacted = replicaPlan.liveUncontacted();
+ if (uncontacted.isEmpty())
return;
long timeout = Long.MAX_VALUE;
@@ -313,7 +311,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
for (ColumnFamilyStore cf : cfs)
cf.metric.speculativeWrites.inc();
- writePerformer.apply(mutation, replicaLayout.forNaturalUncontacted(),
+ writePerformer.apply(mutation, replicaPlan.withContact(uncontacted),
(AbstractWriteResponseHandler<IMutation>) this,
localDC);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 8ffca6a..b32f67e 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -331,7 +331,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
Multimap<String, InetAddressAndPort> dcEndpointsMap = topology.getDatacenterEndpoints();
Iterable<InetAddressAndPort> dcEndpoints = concat(transform(dataCenters, dcEndpointsMap::get));
- return neighbors.keep(dcEndpoints);
+ return neighbors.select(dcEndpoints, true);
}
else if (hosts != null && !hosts.isEmpty())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index ee74df5..63fbc72 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -36,7 +36,7 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime)
{
- super(wrapped.replicaLayout, wrapped.callback, wrapped.writeType, queryStartNanoTime);
+ super(wrapped.replicaPlan, wrapped.callback, wrapped.writeType, queryStartNanoTime);
this.wrapped = wrapped;
this.requiredBeforeFinish = requiredBeforeFinish;
this.cleanup = cleanup;
@@ -64,24 +64,19 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
wrapped.onFailure(from, failureReason);
}
- public void assureSufficientLiveNodes()
- {
- wrapped.assureSufficientLiveNodes();
- }
-
public void get() throws WriteTimeoutException, WriteFailureException
{
wrapped.get();
}
- protected int totalBlockFor()
+ protected int blockFor()
{
- return wrapped.totalBlockFor();
+ return wrapped.blockFor();
}
- protected int totalEndpoints()
+ protected int candidateReplicaCount()
{
- return wrapped.totalEndpoints();
+ return wrapped.candidateReplicaCount();
}
protected boolean waitingFor(InetAddressAndPort from)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index d4cdcc6..4c892ff 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -22,10 +22,10 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.WriteType;
@@ -40,16 +40,16 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse
private final Map<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
private final AtomicInteger acks = new AtomicInteger(0);
- public DatacenterSyncWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
// Response is been managed by the map so make it 1 for the superclass.
- super(replicaLayout, callback, writeType, queryStartNanoTime);
- assert replicaLayout.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
+ super(replicaPlan, callback, writeType, queryStartNanoTime);
+ assert replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaLayout.keyspace().getReplicationStrategy();
+ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaPlan.keyspace().getReplicationStrategy();
for (String dc : strategy.getDatacenters())
{
@@ -59,7 +59,7 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse
// During bootstrap, we have to include the pending endpoints or we may fail the consistency level
// guarantees (see #833)
- for (Replica pending : replicaLayout.pending())
+ for (Replica pending : replicaPlan.pending())
{
responses.get(snitch.getDatacenter(pending)).incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index b458a71..a3ef76f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.service;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
/**
@@ -27,13 +27,13 @@ import org.apache.cassandra.net.MessageIn;
*/
public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
{
- public DatacenterWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ public DatacenterWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
- super(replicaLayout, callback, writeType, queryStartNanoTime);
- assert replicaLayout.consistencyLevel().isDatacenterLocal();
+ super(replicaPlan, callback, writeType, queryStartNanoTime);
+ assert replicaPlan.consistencyLevel().isDatacenterLocal();
}
@Override
@@ -54,6 +54,6 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
@Override
protected boolean waitingFor(InetAddressAndPort from)
{
- return replicaLayout.consistencyLevel().isLocal(from);
+ return replicaPlan.consistencyLevel().isLocal(from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5eb43cf..fc49330 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -60,7 +60,6 @@ import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.ViewUtils;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
@@ -135,7 +134,7 @@ public class StorageProxy implements StorageProxyMBean
standardWritePerformer = (mutation, targets, responseHandler, localDataCenter) ->
{
assert mutation instanceof Mutation;
- sendToHintedReplicas((Mutation) mutation, targets.selected(), responseHandler, localDataCenter, Stage.MUTATION);
+ sendToHintedReplicas((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION);
};
/*
@@ -146,17 +145,17 @@ public class StorageProxy implements StorageProxyMBean
*/
counterWritePerformer = (mutation, targets, responseHandler, localDataCenter) ->
{
- EndpointsForToken selected = targets.selected().withoutSelf();
+ EndpointsForToken selected = targets.contacts().withoutSelf();
Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
- counterWriteTask(mutation, selected, responseHandler, localDataCenter).run();
+ counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter).run();
};
counterWriteOnCoordinatorPerformer = (mutation, targets, responseHandler, localDataCenter) ->
{
- EndpointsForToken selected = targets.selected().withoutSelf();
+ EndpointsForToken selected = targets.contacts().withoutSelf();
Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
StageManager.getStage(Stage.COUNTER_MUTATION)
- .execute(counterWriteTask(mutation, selected, responseHandler, localDataCenter));
+ .execute(counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter));
};
for(ConsistencyLevel level : ConsistencyLevel.values())
@@ -232,9 +231,9 @@ public class StorageProxy implements StorageProxyMBean
while (System.nanoTime() - queryStartNanoTime < timeout)
{
// for simplicity, we'll do a single liveness check at the start of each attempt
- ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
+ ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
- final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaLayout, consistencyForPaxos, consistencyForCommit, true, state);
+ final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaPlan, consistencyForPaxos, consistencyForCommit, true, state);
final UUID ballot = pair.ballot;
contentions += pair.contentions;
@@ -276,7 +275,7 @@ public class StorageProxy implements StorageProxyMBean
Commit proposal = Commit.newProposal(ballot, updates);
Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
- if (proposePaxos(proposal, replicaLayout, true, queryStartNanoTime))
+ if (proposePaxos(proposal, replicaPlan, true, queryStartNanoTime))
{
commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime);
Tracing.trace("CAS successful");
@@ -334,7 +333,7 @@ public class StorageProxy implements StorageProxyMBean
private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime,
DecoratedKey key,
TableMetadata metadata,
- ReplicaLayout.ForPaxos replicaLayout,
+ ReplicaPlan.ForPaxosWrite paxosPlan,
ConsistencyLevel consistencyForPaxos,
ConsistencyLevel consistencyForCommit,
final boolean isWrite,
@@ -360,7 +359,7 @@ public class StorageProxy implements StorageProxyMBean
// prepare
Tracing.trace("Preparing {}", ballot);
Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
- summary = preparePaxos(toPrepare, replicaLayout, queryStartNanoTime);
+ summary = preparePaxos(toPrepare, paxosPlan, queryStartNanoTime);
if (!summary.promised)
{
Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
@@ -383,7 +382,7 @@ public class StorageProxy implements StorageProxyMBean
else
casReadMetrics.unfinishedCommit.inc();
Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
- if (proposePaxos(refreshedInProgress, replicaLayout, false, queryStartNanoTime))
+ if (proposePaxos(refreshedInProgress, paxosPlan, false, queryStartNanoTime))
{
try
{
@@ -440,12 +439,12 @@ public class StorageProxy implements StorageProxyMBean
MessagingService.instance().sendOneWay(message, target);
}
- private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaLayout.ForPaxos replicaLayout, long queryStartNanoTime)
+ private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaPlan.ForPaxosWrite replicaPlan, long queryStartNanoTime)
throws WriteTimeoutException
{
- PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaLayout.getRequiredParticipants(), replicaLayout.consistencyLevel(), queryStartNanoTime);
+ PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), queryStartNanoTime);
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer);
- for (Replica replica: replicaLayout.selected())
+ for (Replica replica: replicaPlan.contacts())
{
if (replica.isLocal())
{
@@ -478,12 +477,12 @@ public class StorageProxy implements StorageProxyMBean
return callback;
}
- private static boolean proposePaxos(Commit proposal, ReplicaLayout.ForPaxos replicaLayout, boolean timeoutIfPartial, long queryStartNanoTime)
+ private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite replicaPlan, boolean timeoutIfPartial, long queryStartNanoTime)
throws WriteTimeoutException
{
- ProposeCallback callback = new ProposeCallback(replicaLayout.selected().size(), replicaLayout.getRequiredParticipants(), !timeoutIfPartial, replicaLayout.consistencyLevel(), queryStartNanoTime);
+ ProposeCallback callback = new ProposeCallback(replicaPlan.contacts().size(), replicaPlan.requiredParticipants(), !timeoutIfPartial, replicaPlan.consistencyLevel(), queryStartNanoTime);
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer);
- for (Replica replica : replicaLayout.selected())
+ for (Replica replica : replicaPlan.contacts())
{
if (replica.isLocal())
{
@@ -518,7 +517,7 @@ public class StorageProxy implements StorageProxyMBean
return true;
if (timeoutIfPartial && !callback.isFullyRefused())
- throw new WriteTimeoutException(WriteType.CAS, replicaLayout.consistencyLevel(), callback.getAcceptCount(), replicaLayout.getRequiredParticipants());
+ throw new WriteTimeoutException(WriteType.CAS, replicaPlan.consistencyLevel(), callback.getAcceptCount(), replicaPlan.requiredParticipants());
return false;
}
@@ -531,21 +530,22 @@ public class StorageProxy implements StorageProxyMBean
Token tk = proposal.update.partitionKey().getToken();
AbstractWriteResponseHandler<Commit> responseHandler = null;
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive);
+ // NOTE: this ReplicaPlan is a lie, this usage of ReplicaPlan could do with being clarified - the selected() collection is essentially (I think) never used
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeAll);
if (shouldBlock)
{
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
- responseHandler = rs.getWriteResponseHandler(replicaLayout, null, WriteType.SIMPLE, queryStartNanoTime);
+ responseHandler = rs.getWriteResponseHandler(replicaPlan, null, WriteType.SIMPLE, queryStartNanoTime);
responseHandler.setSupportsBackPressure(false);
}
- MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
- for (Replica replica : replicaLayout.all())
+ MessageOut<Commit> message = new MessageOut<>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
+ for (Replica replica : replicaPlan.liveAndDown())
{
InetAddressAndPort destination = replica.endpoint();
checkHintOverload(replica);
- if (FailureDetector.instance.isAlive(destination))
+ if (replicaPlan.isAlive(replica))
{
if (shouldBlock)
{
@@ -616,10 +616,10 @@ public class StorageProxy implements StorageProxyMBean
* the data across to some other replica.
*
* @param mutations the mutations to be applied across the replicas
- * @param consistency_level the consistency level for the operation
+ * @param consistencyLevel the consistency level for the operation
* @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
*/
- public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime)
+ public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException
{
Tracing.trace("Determining replicas for mutation");
@@ -637,7 +637,7 @@ public class StorageProxy implements StorageProxyMBean
if (mutation instanceof CounterMutation)
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime));
else
- responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime));
+ responseHandlers.add(performWrite(mutation, consistencyLevel, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime));
}
// upgrade to full quorum any failed cheap quorums
@@ -653,7 +653,7 @@ public class StorageProxy implements StorageProxyMBean
}
catch (WriteTimeoutException|WriteFailureException ex)
{
- if (consistency_level == ConsistencyLevel.ANY)
+ if (consistencyLevel == ConsistencyLevel.ANY)
{
hintMutations(mutations);
}
@@ -662,7 +662,7 @@ public class StorageProxy implements StorageProxyMBean
if (ex instanceof WriteFailureException)
{
writeMetrics.failures.mark();
- writeMetricsMap.get(consistency_level).failures.mark();
+ writeMetricsMap.get(consistencyLevel).failures.mark();
WriteFailureException fe = (WriteFailureException)ex;
Tracing.trace("Write failure; received {} of {} required replies, failed {} requests",
fe.received, fe.blockFor, fe.failureReasonByEndpoint.size());
@@ -670,7 +670,7 @@ public class StorageProxy implements StorageProxyMBean
else
{
writeMetrics.timeouts.mark();
- writeMetricsMap.get(consistency_level).timeouts.mark();
+ writeMetricsMap.get(consistencyLevel).timeouts.mark();
WriteTimeoutException te = (WriteTimeoutException)ex;
Tracing.trace("Write timeout; received {} of {} required replies", te.received, te.blockFor);
}
@@ -680,14 +680,14 @@ public class StorageProxy implements StorageProxyMBean
catch (UnavailableException e)
{
writeMetrics.unavailables.mark();
- writeMetricsMap.get(consistency_level).unavailables.mark();
+ writeMetricsMap.get(consistencyLevel).unavailables.mark();
Tracing.trace("Unavailable");
throw e;
}
catch (OverloadedException e)
{
writeMetrics.unavailables.mark();
- writeMetricsMap.get(consistency_level).unavailables.mark();
+ writeMetricsMap.get(consistencyLevel).unavailables.mark();
Tracing.trace("Overloaded");
throw e;
}
@@ -695,7 +695,7 @@ public class StorageProxy implements StorageProxyMBean
{
long latency = System.nanoTime() - startTime;
writeMetrics.addNano(latency);
- writeMetricsMap.get(consistency_level).addNano(latency);
+ writeMetricsMap.get(consistencyLevel).addNano(latency);
updateCoordinatorWriteLatencyTableMetric(mutations, latency);
}
}
@@ -725,7 +725,8 @@ public class StorageProxy implements StorageProxyMBean
// local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510),
// so there is no need to hint or retry.
- EndpointsForToken replicasToHint = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token)
+ EndpointsForToken replicasToHint = ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token)
+ .all()
.filter(StorageProxy::shouldHint);
submitHint(mutation, replicasToHint, null);
@@ -737,8 +738,8 @@ public class StorageProxy implements StorageProxyMBean
Token token = mutation.key().getToken();
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- return StorageService.instance.getNaturalReplicasForToken(keyspaceName, token).endpoints().contains(local)
- || StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspaceName).endpoints().contains(local);
+ return ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token)
+ .all().endpoints().contains(local);
}
/**
@@ -934,7 +935,6 @@ public class StorageProxy implements StorageProxyMBean
cleanup,
queryStartNanoTime);
// exit early if we can't fulfill the CL at this time.
- wrapper.handler.assureSufficientLiveNodes();
wrappers.add(wrapper);
}
@@ -1002,14 +1002,14 @@ public class StorageProxy implements StorageProxyMBean
throws WriteTimeoutException, WriteFailureException
{
Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forBatchlogWrite(systemKeypsace, endpoints);
- WriteResponseHandler<?> handler = new WriteResponseHandler(replicaLayout,
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(systemKeypsace, endpoints);
+ WriteResponseHandler<?> handler = new WriteResponseHandler(replicaPlan,
WriteType.BATCH_LOG,
queryStartNanoTime);
Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
- for (Replica replica : replicaLayout.all())
+ for (Replica replica : replicaPlan.liveAndDown())
{
logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size());
@@ -1040,12 +1040,12 @@ public class StorageProxy implements StorageProxyMBean
{
for (WriteResponseHandlerWrapper wrapper : wrappers)
{
- Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); // TODO: CASSANDRA-14549
- ReplicaLayout.ForToken replicas = wrapper.handler.replicaLayout.withSelected(wrapper.handler.replicaLayout.all());
+ Replicas.temporaryAssertFull(wrapper.handler.replicaPlan.liveAndDown()); // TODO: CASSANDRA-14549
+ ReplicaPlan.ForTokenWrite replicas = wrapper.handler.replicaPlan.withContact(wrapper.handler.replicaPlan.liveAndDown());
try
{
- sendToHintedReplicas(wrapper.mutation, replicas.selected(), wrapper.handler, localDataCenter, stage);
+ sendToHintedReplicas(wrapper.mutation, replicas, wrapper.handler, localDataCenter, stage);
}
catch (OverloadedException | WriteTimeoutException e)
{
@@ -1059,11 +1059,11 @@ public class StorageProxy implements StorageProxyMBean
{
for (WriteResponseHandlerWrapper wrapper : wrappers)
{
- Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); // TODO: CASSANDRA-14549
- sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaLayout.all(), wrapper.handler, localDataCenter, stage);
+ EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown();
+ Replicas.temporaryAssertFull(sendTo); // TODO: CASSANDRA-14549
+ sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaPlan.withContact(sendTo), wrapper.handler, localDataCenter, stage);
}
-
for (WriteResponseHandlerWrapper wrapper : wrappers)
wrapper.handler.get();
}
@@ -1096,13 +1096,10 @@ public class StorageProxy implements StorageProxyMBean
Token tk = mutation.key().getToken();
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk);
- AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime);
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+ AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaPlan, callback, writeType, queryStartNanoTime);
- // exit early if we can't fulfill the CL at this time
- responseHandler.assureSufficientLiveNodes();
-
- performer.apply(mutation, replicaLayout, responseHandler, localDataCenter);
+ performer.apply(mutation, replicaPlan, responseHandler, localDataCenter);
return responseHandler;
}
@@ -1118,8 +1115,8 @@ public class StorageProxy implements StorageProxyMBean
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
Token tk = mutation.key().getToken();
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive);
- AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout,null, writeType, queryStartNanoTime);
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+ AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan,null, writeType, queryStartNanoTime);
BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
return new WriteResponseHandlerWrapper(batchHandler, mutation);
}
@@ -1140,10 +1137,11 @@ public class StorageProxy implements StorageProxyMBean
{
Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
- Token tk = mutation.key().getToken();
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk, naturalEndpoints, pendingEndpoints);
- AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout, () -> {
+ ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(naturalEndpoints, pendingEndpoints);
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, liveAndDown, ReplicaPlans.writeAll);
+
+ AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan, () -> {
long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get());
viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS);
}, writeType, queryStartNanoTime);
@@ -1208,7 +1206,7 @@ public class StorageProxy implements StorageProxyMBean
* @throws OverloadedException if the hints cannot be written/enqueued
*/
public static void sendToHintedReplicas(final Mutation mutation,
- EndpointsForToken targets,
+ ReplicaPlan.ForTokenWrite plan,
AbstractWriteResponseHandler<IMutation> responseHandler,
String localDataCenter,
Stage stage)
@@ -1227,11 +1225,11 @@ public class StorageProxy implements StorageProxyMBean
List<InetAddressAndPort> backPressureHosts = null;
- for (Replica destination : targets)
+ for (Replica destination : plan.contacts())
{
checkHintOverload(destination);
- if (FailureDetector.instance.isAlive(destination.endpoint()))
+ if (plan.isAlive(destination))
{
if (destination.isLocal())
{
@@ -1251,7 +1249,7 @@ public class StorageProxy implements StorageProxyMBean
if (localDataCenter.equals(dc))
{
if (localDc == null)
- localDc = new ArrayList<>(targets.size());
+ localDc = new ArrayList<>(plan.contacts().size());
localDc.add(destination);
}
@@ -1268,7 +1266,7 @@ public class StorageProxy implements StorageProxyMBean
}
if (backPressureHosts == null)
- backPressureHosts = new ArrayList<>(targets.size());
+ backPressureHosts = new ArrayList<>(plan.contacts().size());
backPressureHosts.add(destination.endpoint());
}
@@ -1345,7 +1343,7 @@ public class StorageProxy implements StorageProxyMBean
message,
destination,
message.getTimeout(),
- handler.replicaLayout.consistencyLevel(),
+ handler.replicaPlan.consistencyLevel(),
true);
messageIds[idIdx++] = id;
logger.trace("Adding FWD message to {}@{}", id, destination);
@@ -1435,14 +1433,13 @@ public class StorageProxy implements StorageProxyMBean
// Exit now if we can't fulfill the CL here instead of forwarding to the leader replica
String keyspaceName = cm.getKeyspaceName();
Keyspace keyspace = Keyspace.open(keyspaceName);
- AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
Token tk = cm.key().getToken();
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, cm.consistency(), tk);
- rs.getWriteResponseHandler(replicaLayout, null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes();
+ ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, ReplicaPlans.writeAll)
+ .assureSufficientReplicas();
// Forward the actual update to the chosen leader replica
- AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaLayout.forCounterWrite(keyspace, tk, replica),
+ AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaPlans.forForwardingCounterWrite(keyspace, tk, replica),
WriteType.COUNTER, queryStartNanoTime);
Tracing.trace("Enqueuing counter update to {}", replica);
@@ -1465,7 +1462,7 @@ public class StorageProxy implements StorageProxyMBean
{
Keyspace keyspace = Keyspace.open(keyspaceName);
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- EndpointsForToken replicas = StorageService.instance.getLiveNaturalReplicasForToken(keyspace, key);
+ EndpointsForToken replicas = keyspace.getReplicationStrategy().getNaturalReplicasForToken(key);
// CASSANDRA-13043: filter out those endpoints not accepting clients yet, maybe because still bootstrapping
replicas = replicas.filter(replica -> StorageService.instance.isRpcReady(replica.endpoint()));
@@ -1511,7 +1508,7 @@ public class StorageProxy implements StorageProxyMBean
}
private static Runnable counterWriteTask(final IMutation mutation,
- final EndpointsForToken targets,
+ final ReplicaPlan.ForTokenWrite replicaPlan,
final AbstractWriteResponseHandler<IMutation> responseHandler,
final String localDataCenter)
{
@@ -1524,7 +1521,7 @@ public class StorageProxy implements StorageProxyMBean
Mutation result = ((CounterMutation) mutation).applyCounterMutation();
responseHandler.response(null);
- sendToHintedReplicas(result, targets, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
+ sendToHintedReplicas(result, replicaPlan, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
}
};
}
@@ -1592,7 +1589,7 @@ public class StorageProxy implements StorageProxyMBean
try
{
// make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read
- ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
+ ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
// does the work of applying in-progress writes; throws UAE or timeout if it can't
final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
@@ -1601,7 +1598,7 @@ public class StorageProxy implements StorageProxyMBean
try
{
- final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaLayout, consistencyLevel, consistencyForCommitOrFetch, false, state);
+ final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaPlan, consistencyLevel, consistencyForCommitOrFetch, false, state);
if (pair.contentions > 0)
casReadMetrics.contention.update(pair.contentions);
}
@@ -1850,21 +1847,6 @@ public class StorageProxy implements StorageProxyMBean
}
}
- public static EndpointsForToken getLiveSortedReplicasForToken(Keyspace keyspace, RingPosition pos)
- {
- return getLiveSortedReplicas(keyspace, pos).forToken(pos.getToken());
- }
-
- public static EndpointsForRange getLiveSortedReplicas(Keyspace keyspace, RingPosition pos)
- {
- EndpointsForRange liveReplicas = StorageService.instance.getLiveNaturalReplicas(keyspace, pos);
- // Replica availability is considered by the query path
- Preconditions.checkState(liveReplicas.isEmpty() || liveReplicas.stream().anyMatch(Replica::isFull),
- "At least one full replica required for reads: " + liveReplicas);
-
- return DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), liveReplicas);
- }
-
/**
* Estimate the number of result rows per range in the ring based on our local data.
* <p>
@@ -1883,7 +1865,7 @@ public class StorageProxy implements StorageProxyMBean
return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
}
- private static class RangeIterator extends AbstractIterator<ReplicaLayout.ForRange>
+ private static class RangeIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
{
private final Keyspace keyspace;
private final ConsistencyLevel consistency;
@@ -1907,43 +1889,34 @@ public class StorageProxy implements StorageProxyMBean
return rangeCount;
}
- protected ReplicaLayout.ForRange computeNext()
+ protected ReplicaPlan.ForRangeRead computeNext()
{
if (!ranges.hasNext())
return endOfData();
- AbstractBounds<PartitionPosition> range = ranges.next();
- EndpointsForRange liveReplicas = getLiveSortedReplicas(keyspace, range.right);
-
- int blockFor = consistency.blockFor(keyspace);
- EndpointsForRange targetReplicas = consistency.filterForQuery(keyspace, liveReplicas);
- int minResponses = Math.min(targetReplicas.size(), blockFor);
-
- // Endpoints for range here as well
- return ReplicaLayout.forRangeRead(keyspace, consistency, range,
- liveReplicas, targetReplicas.subList(0, minResponses));
+ return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next());
}
}
- private static class RangeMerger extends AbstractIterator<ReplicaLayout.ForRange>
+ private static class RangeMerger extends AbstractIterator<ReplicaPlan.ForRangeRead>
{
private final Keyspace keyspace;
private final ConsistencyLevel consistency;
- private final PeekingIterator<ReplicaLayout.ForRange> ranges;
+ private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges;
- private RangeMerger(Iterator<ReplicaLayout.ForRange> iterator, Keyspace keyspace, ConsistencyLevel consistency)
+ private RangeMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency)
{
this.keyspace = keyspace;
this.consistency = consistency;
this.ranges = Iterators.peekingIterator(iterator);
}
- protected ReplicaLayout.ForRange computeNext()
+ protected ReplicaPlan.ForRangeRead computeNext()
{
if (!ranges.hasNext())
return endOfData();
- ReplicaLayout.ForRange current = ranges.next();
+ ReplicaPlan.ForRangeRead current = ranges.next();
// getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
// the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
@@ -1955,25 +1928,15 @@ public class StorageProxy implements StorageProxyMBean
// Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
// the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
// wire compatibility, so It's likely easier not to bother;
- if (current.range.right.isMinimum())
+ if (current.range().right.isMinimum())
break;
- ReplicaLayout.ForRange next = ranges.peek();
-
- EndpointsForRange merged = current.all().keep(next.all().endpoints());
-
- // Check if there is enough endpoint for the merge to be possible.
- if (!consistency.isSufficientLiveNodesForRead(keyspace, merged))
- break;
-
- EndpointsForRange filteredMerged = consistency.filterForQuery(keyspace, merged);
-
- // Estimate whether merging will be a win or not
- if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, current.selected(), next.selected()))
+ ReplicaPlan.ForRangeRead next = ranges.peek();
+ ReplicaPlan.ForRangeRead merged = ReplicaPlans.maybeMerge(keyspace, consistency, current, next);
+ if (merged == null)
break;
- // If we get there, merge this range and the next one
- current = ReplicaLayout.forRangeRead(keyspace, consistency, current.range.withNewRight(next.range.right), merged, filteredMerged);
+ current = merged;
ranges.next(); // consume the range we just merged since we've only peeked so far
}
return current;
@@ -2018,7 +1981,7 @@ public class StorageProxy implements StorageProxyMBean
private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
{
- private final Iterator<ReplicaLayout.ForRange> ranges;
+ private final Iterator<ReplicaPlan.ForRangeRead> ranges;
private final int totalRangeCount;
private final PartitionRangeReadCommand command;
private final boolean enforceStrictLiveness;
@@ -2108,42 +2071,40 @@ public class StorageProxy implements StorageProxyMBean
/**
* Queries the provided sub-range.
*
- * @param replicaLayout the subRange to query.
+ * @param replicaPlan the subRange to query.
* @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
* that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
* {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
* that it's the query that "continues" whatever we're previously queried).
*/
- private SingleRangeResponse query(ReplicaLayout.ForRange replicaLayout, boolean isFirst)
+ private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst)
{
- PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaLayout.range, isFirst);
- ReadRepair<EndpointsForRange, ReplicaLayout.ForRange> readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
- DataResolver<EndpointsForRange, ReplicaLayout.ForRange> resolver = new DataResolver<>(rangeCommand, replicaLayout, readRepair, queryStartNanoTime);
- Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-
- ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = new ReadCallback<>(resolver,
- replicaLayout.consistencyLevel().blockFor(keyspace),
- rangeCommand,
- replicaLayout,
- queryStartNanoTime);
+ PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst);
+ ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
+ ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
+ = ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime);
+ DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
+ = new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime);
+ ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler
+ = new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime);
- handler.assureSufficientLiveNodes();
+ replicaPlan.assureSufficientReplicas();
// If enabled, request repaired data tracking info from full replicas but
// only if there are multiple full replicas to compare results from
if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()
- && replicaLayout.selected().filter(Replica::isFull).size() > 1)
+ && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
{
command.trackRepairedStatus();
}
- if (replicaLayout.selected().size() == 1 && replicaLayout.selected().get(0).isLocal())
+ if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isLocal())
{
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
}
else
{
- for (Replica replica : replicaLayout.selected())
+ for (Replica replica : replicaPlan.contacts())
{
Tracing.trace("Enqueuing request to {}", replica);
PartitionRangeReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery();
@@ -2486,7 +2447,7 @@ public class StorageProxy implements StorageProxyMBean
public interface WritePerformer
{
public void apply(IMutation mutation,
- ReplicaLayout.ForToken targets,
+ ReplicaPlan.ForTokenWrite targets,
AbstractWriteResponseHandler<IMutation> responseHandler,
String localDataCenter) throws OverloadedException;
}
@@ -2499,7 +2460,7 @@ public class StorageProxy implements StorageProxyMBean
public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime)
{
super(writeHandler, i, cleanup, queryStartNanoTime);
- viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
+ viewWriteMetrics.viewReplicasAttempted.inc(candidateReplicaCount());
}
public void response(MessageIn<IMutation> msg)
@@ -2705,7 +2666,7 @@ public class StorageProxy implements StorageProxyMBean
HintsService.instance.write(hostIds, Hint.create(mutation, System.currentTimeMillis()));
validTargets.forEach(HintsService.instance.metrics::incrCreatedHints);
// Notify the handler only for CL == ANY
- if (responseHandler != null && responseHandler.replicaLayout.consistencyLevel() == ConsistencyLevel.ANY)
+ if (responseHandler != null && responseHandler.replicaPlan.consistencyLevel() == ConsistencyLevel.ANY)
responseHandler.response(null);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7f4ae14..a979f1c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -172,7 +172,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public RangesAtEndpoint getLocalReplicas(String keyspaceName)
{
- return getReplicasForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort());
+ return Keyspace.open(keyspaceName).getReplicationStrategy()
+ .getAddressReplicas(FBUtilities.getBroadcastAddressAndPort());
}
public List<Range<Token>> getLocalAndPendingRanges(String ks)
@@ -2015,11 +2016,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
private EndpointsByRange constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
{
+ AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
Map<Range<Token>, EndpointsForRange> rangeToEndpointMap = new HashMap<>(ranges.size());
for (Range<Token> range : ranges)
- {
- rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalReplicas(range.right));
- }
+ rangeToEndpointMap.put(range, strategy.getNaturalReplicas(range.right));
return new EndpointsByRange(rangeToEndpointMap);
}
@@ -3878,16 +3878,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
/**
- * Get all ranges an endpoint is responsible for (by keyspace)
- * @param ep endpoint we are interested in.
- * @return ranges for the specified endpoint.
- */
- RangesAtEndpoint getReplicasForEndpoint(String keyspaceName, InetAddressAndPort ep)
- {
- return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressReplicas(ep);
- }
-
- /**
* Get all ranges that span the ring given a set
* of tokens. All ranges are in sorted order of
* ranges.
@@ -3936,11 +3926,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, cf, key), true);
}
-
@Deprecated
public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key)
{
- EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key));
+ EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, key);
List<InetAddress> inetList = new ArrayList<>(replicas.size());
replicas.forEach(r -> inetList.add(r.endpoint().address));
return inetList;
@@ -3948,7 +3937,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public List<String> getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer key)
{
- return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key)), true);
+ EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, key);
+ return Replicas.stringify(replicas, true);
}
public List<String> getReplicas(String keyspaceName, String cf, String key)
@@ -3971,61 +3961,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (metadata == null)
throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
- return getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key)));
- }
-
- /**
- * This method returns the N endpoints that are responsible for storing the
- * specified key i.e for replication.
- *
- * @param keyspaceName keyspace name also known as keyspace
- * @param pos position for which we need to find the endpoint
- * @return the endpoint responsible for this token
- */
- public static EndpointsForToken getNaturalReplicasForToken(String keyspaceName, RingPosition pos)
- {
- return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(pos);
+ return getNaturalReplicasForToken(keyspaceName, metadata.partitionKeyType.fromString(key));
}
- /**
- * Returns the endpoints currently responsible for storing the token plus pending ones
- */
- public EndpointsForToken getNaturalAndPendingReplicasForToken(String keyspaceName, Token token)
- {
- // TODO: race condition to fetch these. impliciations??
- EndpointsForToken natural = getNaturalReplicasForToken(keyspaceName, token);
- EndpointsForToken pending = tokenMetadata.pendingEndpointsForToken(token, keyspaceName);
- if (Endpoints.haveConflicts(natural, pending))
- {
- natural = Endpoints.resolveConflictsInNatural(natural, pending);
- pending = Endpoints.resolveConflictsInPending(natural, pending);
- }
- return Endpoints.concat(natural, pending);
- }
-
- /**
- * This method attempts to return N endpoints that are responsible for storing the
- * specified key i.e for replication.
- *
- * @param keyspace keyspace name also known as keyspace
- * @param pos position for which we need to find the endpoint
- */
- public EndpointsForToken getLiveNaturalReplicasForToken(Keyspace keyspace, RingPosition pos)
- {
- return getLiveNaturalReplicas(keyspace, pos).forToken(pos.getToken());
- }
-
- /**
- * This method attempts to return N endpoints that are responsible for storing the
- * specified key i.e for replication.
- *
- * @param keyspace keyspace name also known as keyspace
- * @param pos position for which we need to find the endpoint
- */
- public EndpointsForRange getLiveNaturalReplicas(Keyspace keyspace, RingPosition pos)
+ public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, ByteBuffer key)
{
- EndpointsForRange replicas = keyspace.getReplicationStrategy().getNaturalReplicas(pos);
- return replicas.filter(r -> FailureDetector.instance.isAlive(r.endpoint()));
+ Token token = tokenMetadata.partitioner.getToken(key);
+ return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(token);
}
public void setLoggingLevel(String classQualifier, String rawLevel) throws Exception
@@ -4268,7 +4210,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
.filter(endpoint -> FailureDetector.instance.isAlive(endpoint) && !FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
.collect(Collectors.toList());
- return EndpointsForRange.copyOf(SystemReplicas.getSystemReplicas(endpoints));
+ return SystemReplicas.getSystemReplicas(endpoints);
}
/**
* Find the best target to stream hints to. Currently the closest peer according to the snitch
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index a07aae6..f9bfedf 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -19,9 +19,7 @@ package org.apache.cassandra.service;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.ReplicaLayout;
-
+import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,18 +37,18 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater
= AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses");
- public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
- super(replicaLayout, callback, writeType, queryStartNanoTime);
- responses = totalBlockFor();
+ super(replicaPlan, callback, writeType, queryStartNanoTime);
+ responses = blockFor();
}
- public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout, WriteType writeType, long queryStartNanoTime)
+ public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, WriteType writeType, long queryStartNanoTime)
{
- this(replicaLayout, null, writeType, queryStartNanoTime);
+ this(replicaPlan, null, writeType, queryStartNanoTime);
}
public void response(MessageIn<T> m)
@@ -65,7 +63,7 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
protected int ackCount()
{
- return totalBlockFor() - responses;
+ return blockFor() - responses;
}
public boolean isLatencyForSnitch()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[4/4] cassandra git commit: ReplicaPlan/Layout refactor
follow-up/completion
Posted by be...@apache.org.
ReplicaPlan/Layout refactor follow-up/completion
Finish much of the work to clarify endpoint selection
that was begun in Transient Replication (CASSANDRA-14404)
Also fixes:
- commitPaxos was incorrectly selecting only live nodes,
when needed to include down
- We were not writing to pending transient replicas
- On write, we were not hinting to full nodes with transient
replication
- rr.maybeSendAdditional{Reads,Writes} would only consult the
same node we may have speculated a read to
- transient->full movements mishandled consistency level upgrade by
retaining the 'full' pending variant, which increased CL requirement;
instead, the 'natural' replica is upgraded to 'full' for writes
patch by Benedict; reviewed by Alex Petrov and Ariel Weisberg for CASSANDRA-14705
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/047bcd7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/047bcd7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/047bcd7a
Branch: refs/heads/trunk
Commit: 047bcd7ad171d6a4aa89128c5e6c6ed5f012b1c0
Parents: 05dbb3e
Author: Benedict Elliott Smith <be...@apple.com>
Authored: Fri Sep 7 11:41:28 2018 +0100
Committer: Benedict Elliott Smith <be...@apple.com>
Committed: Fri Sep 14 10:14:37 2018 +0100
----------------------------------------------------------------------
.../cassandra/batchlog/BatchlogManager.java | 64 ++-
.../apache/cassandra/db/ConsistencyLevel.java | 26 +-
.../apache/cassandra/gms/FailureDetector.java | 4 +
.../apache/cassandra/hints/HintsService.java | 4 +-
.../locator/AbstractReplicaCollection.java | 67 +--
.../locator/AbstractReplicationStrategy.java | 20 +-
.../org/apache/cassandra/locator/Endpoints.java | 59 +--
.../cassandra/locator/EndpointsForRange.java | 2 +-
.../cassandra/locator/EndpointsForToken.java | 2 +-
.../cassandra/locator/RangesAtEndpoint.java | 2 +-
.../cassandra/locator/ReplicaCollection.java | 27 +-
.../apache/cassandra/locator/ReplicaLayout.java | 435 ++++++++-----------
.../apache/cassandra/locator/ReplicaPlan.java | 240 ++++++++++
.../apache/cassandra/locator/ReplicaPlans.java | 295 +++++++++++++
.../cassandra/locator/SystemReplicas.java | 12 +-
.../apache/cassandra/locator/TokenMetadata.java | 8 +-
.../apache/cassandra/net/IAsyncCallback.java | 10 -
.../service/AbstractWriteResponseHandler.java | 50 +--
.../cassandra/service/ActiveRepairService.java | 2 +-
.../service/BatchlogResponseHandler.java | 15 +-
.../DatacenterSyncWriteResponseHandler.java | 12 +-
.../service/DatacenterWriteResponseHandler.java | 10 +-
.../apache/cassandra/service/StorageProxy.java | 239 +++++-----
.../cassandra/service/StorageService.java | 82 +---
.../cassandra/service/WriteResponseHandler.java | 16 +-
.../service/reads/AbstractReadExecutor.java | 78 ++--
.../cassandra/service/reads/DataResolver.java | 26 +-
.../cassandra/service/reads/DigestResolver.java | 22 +-
.../cassandra/service/reads/ReadCallback.java | 62 ++-
.../service/reads/ResponseResolver.java | 22 +-
.../reads/ShortReadPartitionsProtection.java | 19 +-
.../reads/repair/AbstractReadRepair.java | 42 +-
.../reads/repair/BlockingPartitionRepair.java | 29 +-
.../reads/repair/BlockingReadRepair.java | 25 +-
.../service/reads/repair/NoopReadRepair.java | 11 +-
.../repair/PartitionIteratorMergeListener.java | 16 +-
.../reads/repair/ReadOnlyReadRepair.java | 13 +-
.../service/reads/repair/ReadRepair.java | 17 +-
.../reads/repair/ReadRepairDiagnostics.java | 11 +-
.../service/reads/repair/ReadRepairEvent.java | 2 +-
.../reads/repair/ReadRepairStrategy.java | 11 +-
.../reads/repair/RowIteratorMergeListener.java | 21 +-
.../locator/ReplicaCollectionTest.java | 89 ++--
.../service/WriteResponseHandlerTest.java | 4 +-
.../WriteResponseHandlerTransientTest.java | 71 ++-
.../service/reads/DataResolverTest.java | 21 +-
.../reads/DataResolverTransientTest.java | 227 ++++++++++
.../service/reads/DigestResolverTest.java | 5 +-
.../service/reads/ReadExecutorTest.java | 7 +-
.../reads/repair/AbstractReadRepairTest.java | 32 +-
.../reads/repair/BlockingReadRepairTest.java | 36 +-
.../DiagEventsBlockingReadRepairTest.java | 26 +-
.../reads/repair/InstrumentedReadRepair.java | 4 +-
.../reads/repair/ReadOnlyReadRepairTest.java | 24 +-
.../service/reads/repair/ReadRepairTest.java | 12 +-
.../reads/repair/TestableReadRepair.java | 18 +-
56 files changed, 1655 insertions(+), 1051 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 8dda54e..77f725c 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -26,12 +26,9 @@ import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
import com.google.common.collect.*;
import com.google.common.util.concurrent.RateLimiter;
-import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.EndpointsForToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +51,8 @@ import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -459,36 +458,34 @@ public class BatchlogManager implements BatchlogManagerMBean
Keyspace keyspace = Keyspace.open(ks);
Token tk = mutation.key().getToken();
- EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(ks, tk);
- Replicas.temporaryAssertFull(replicas); // TODO in CASSANDRA-14549
+ // TODO: this logic could do with revisiting at some point, as it is unclear what its rationale is
+ // we perform a local write, ignoring errors and inline in this thread (potentially slowing replay down)
+ // effectively bumping CL for locally owned writes and also potentially stalling log replay if an error occurs
+ // once we decide how it should work, it can also probably be simplified, and avoid constructing a ReplicaPlan directly
+ ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
+ Replicas.temporaryAssertFull(liveAndDown.all()); // TODO in CASSANDRA-14549
+
+ Replica selfReplica = liveAndDown.all().selfIfPresent();
+ if (selfReplica != null)
+ mutation.apply();
- EndpointsForToken.Builder liveReplicasBuilder = EndpointsForToken.builder(tk);
- for (Replica replica : replicas)
+ ReplicaLayout.ForTokenWrite liveRemoteOnly = liveAndDown.filter(
+ r -> FailureDetector.isReplicaAlive.test(r) && r != selfReplica);
+
+ for (Replica replica : liveAndDown.all())
{
- if (replica.isLocal())
- {
- mutation.apply();
- }
- else if (FailureDetector.instance.isAlive(replica.endpoint()))
- {
- liveReplicasBuilder.add(replica); // will try delivering directly instead of writing a hint.
- }
- else
- {
- hintedNodes.add(replica.endpoint());
- HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()),
- Hint.create(mutation, writtenAt));
- }
+ if (replica == selfReplica || liveRemoteOnly.all().contains(replica))
+ continue;
+ hintedNodes.add(replica.endpoint());
+ HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()),
+ Hint.create(mutation, writtenAt));
}
- EndpointsForToken liveReplicas = liveReplicasBuilder.build();
- if (liveReplicas.isEmpty())
- return null;
-
- Replicas.temporaryAssertFull(liveReplicas);
- ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(keyspace, liveReplicas, System.nanoTime());
+ ReplicaPlan.ForTokenWrite replicaPlan = new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE,
+ liveRemoteOnly.pending(), liveRemoteOnly.all(), liveRemoteOnly.all(), liveRemoteOnly.all());
+ ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(replicaPlan, System.nanoTime());
MessageOut<Mutation> message = mutation.createMessage();
- for (Replica replica : liveReplicas)
+ for (Replica replica : liveRemoteOnly.all())
MessagingService.instance().sendWriteRR(message, replica, handler, false);
return handler;
}
@@ -509,17 +506,16 @@ public class BatchlogManager implements BatchlogManagerMBean
{
private final Set<InetAddressAndPort> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
- ReplayWriteResponseHandler(Keyspace keyspace, EndpointsForToken writeReplicas, long queryStartNanoTime)
+ ReplayWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, long queryStartNanoTime)
{
- super(ReplicaLayout.forWriteWithDownNodes(keyspace, null, writeReplicas.token(), writeReplicas, EndpointsForToken.empty(writeReplicas.token())),
- null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
- Iterables.addAll(undelivered, writeReplicas.endpoints());
+ super(replicaPlan, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
+ Iterables.addAll(undelivered, replicaPlan.contacts().endpoints());
}
@Override
- protected int totalBlockFor()
+ protected int blockFor()
{
- return this.replicaLayout.selected().size();
+ return this.replicaPlan.contacts().size();
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 35ba198..5a4baf7 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -251,16 +251,10 @@ public enum ConsistencyLevel
if (this == EACH_QUORUM && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
return filterForEachQuorum(keyspace, liveReplicas);
- /*
- * Endpoints are expected to be restricted to live replicas, sorted by snitch preference.
- * For LOCAL_QUORUM, move local-DC replicas in front first as we need them there whether
- * we do read repair (since the first replica gets the data read) or not (since we'll take
- * the blockFor first ones).
- */
- if (isDCLocal)
- liveReplicas = liveReplicas.sorted(DatabaseDescriptor.getLocalComparator());
-
- return liveReplicas.subList(0, Math.min(liveReplicas.size(), blockFor(keyspace) + (alwaysSpeculate ? 1 : 0)));
+ int count = blockFor(keyspace) + (alwaysSpeculate ? 1 : 0);
+ return isDCLocal
+ ? liveReplicas.filter(ConsistencyLevel::isLocal, count)
+ : liveReplicas.subList(0, Math.min(liveReplicas.size(), count));
}
private <E extends Endpoints<E>> E filterForEachQuorum(Keyspace keyspace, E liveReplicas)
@@ -285,7 +279,7 @@ public enum ConsistencyLevel
});
}
- public boolean isSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas)
+ public boolean isSufficientLiveReplicasForRead(Keyspace keyspace, Endpoints<?> liveReplicas)
{
switch (this)
{
@@ -316,15 +310,15 @@ public enum ConsistencyLevel
}
}
- public void assureSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException
+ public void assureSufficientLiveReplicasForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException
{
- assureSufficientLiveNodes(keyspace, liveReplicas, blockFor(keyspace), 1);
+ assureSufficientLiveReplicas(keyspace, liveReplicas, blockFor(keyspace), 1);
}
- public void assureSufficientLiveNodesForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
+ public void assureSufficientLiveReplicasForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
{
- assureSufficientLiveNodes(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0);
+ assureSufficientLiveReplicas(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0);
}
- public void assureSufficientLiveNodes(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
+ void assureSufficientLiveReplicas(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
{
switch (this)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index e567b7b..d7f73ab 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -27,11 +27,13 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.*;
+import org.apache.cassandra.locator.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +73,8 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
}
public static final IFailureDetector instance = new FailureDetector();
+ public static final Predicate<InetAddressAndPort> isEndpointAlive = instance::isAlive;
+ public static final Predicate<Replica> isReplicaAlive = r -> isEndpointAlive.test(r.endpoint());
// this is useless except to provide backwards compatibility in phi_convict_threshold,
// because everyone seems pretty accustomed to the default of 8, and users who have
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index c6ad3d9..73840d3 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -34,6 +34,8 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -185,7 +187,7 @@ public final class HintsService implements HintsServiceMBean
String keyspaceName = hint.mutation.getKeyspaceName();
Token token = hint.mutation.key().getToken();
- EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token);
+ EndpointsForToken replicas = ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token).all();
// judicious use of streams: eagerly materializing probably cheaper
// than performing filters / translations 2x extra via Iterables.filter/transform
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
index 6a7a4ff..94ff991 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
@@ -75,7 +75,6 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
*/
public abstract Mutable<C> newMutable(int initialCapacity);
-
public C snapshot()
{
return isSnapshot ? self()
@@ -83,6 +82,7 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
: new ArrayList<>(list));
}
+ /** see {@link ReplicaCollection#subList(int, int)}*/
public final C subList(int start, int end)
{
List<Replica> subList;
@@ -100,11 +100,23 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
return snapshot(subList);
}
+ /** see {@link ReplicaCollection#count(Predicate)}*/
+ public int count(Predicate<Replica> predicate)
+ {
+ int count = 0;
+ for (int i = 0 ; i < list.size() ; ++i)
+ if (predicate.test(list.get(i)))
+ ++count;
+ return count;
+ }
+
+ /** see {@link ReplicaCollection#filter(Predicate)}*/
public final C filter(Predicate<Replica> predicate)
{
return filter(predicate, Integer.MAX_VALUE);
}
+ /** see {@link ReplicaCollection#filter(Predicate, int)}*/
public final C filter(Predicate<Replica> predicate, int limit)
{
if (isEmpty())
@@ -148,53 +160,8 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
return snapshot(copy);
}
- public final class Select
- {
- private final List<Replica> result;
- public Select(int expectedSize)
- {
- this.result = new ArrayList<>(expectedSize);
- }
-
- /**
- * Add matching replica to the result; this predicate should be mutually exclusive with all prior predicates.
- * Stop once we have targetSize replicas in total, including preceding calls
- */
- public Select add(Predicate<Replica> predicate, int targetSize)
- {
- assert !Iterables.any(result, predicate::test);
- for (int i = 0 ; result.size() < targetSize && i < list.size() ; ++i)
- if (predicate.test(list.get(i)))
- result.add(list.get(i));
- return this;
- }
- public Select add(Predicate<Replica> predicate)
- {
- return add(predicate, Integer.MAX_VALUE);
- }
- public C get()
- {
- return snapshot(result);
- }
- }
-
- /**
- * An efficient method for selecting a subset of replica via a sequence of filters.
- *
- * Example: select().add(filter1).add(filter2, 3).get();
- *
- * @return a Select object
- */
- public final Select select()
- {
- return select(list.size());
- }
- public final Select select(int expectedSize)
- {
- return new Select(expectedSize);
- }
-
- public final C sorted(Comparator<Replica> comparator)
+ /** see {@link ReplicaCollection#sorted(Comparator)}*/
+ public final C sorted(Comparator<Replica> comparator)
{
List<Replica> copy = new ArrayList<>(list);
copy.sort(comparator);
@@ -267,9 +234,9 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
if (replicas.isEmpty())
return extraReplicas;
Mutable<C> mutable = replicas.newMutable(replicas.size() + extraReplicas.size());
- mutable.addAll(replicas);
+ mutable.addAll(replicas, Mutable.Conflict.NONE);
mutable.addAll(extraReplicas, ignoreConflicts);
- return mutable.asImmutableView();
+ return mutable.asSnapshot();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index d168052..bad736f 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -141,33 +141,33 @@ public abstract class AbstractReplicationStrategy
*/
public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata);
- public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
- return getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
+ return getWriteResponseHandler(replicaPlan, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
}
- public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime,
ConsistencyLevel idealConsistencyLevel)
{
AbstractWriteResponseHandler resultResponseHandler;
- if (replicaLayout.consistencyLevel.isDatacenterLocal())
+ if (replicaPlan.consistencyLevel().isDatacenterLocal())
{
// block for in this context will be localnodes block.
- resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
+ resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
}
- else if (replicaLayout.consistencyLevel == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
+ else if (replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
{
- resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
+ resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
}
else
{
- resultResponseHandler = new WriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
+ resultResponseHandler = new WriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
}
//Check if tracking the ideal consistency level is configured
@@ -176,14 +176,14 @@ public abstract class AbstractReplicationStrategy
//If ideal and requested are the same just use this handler to track the ideal consistency level
//This is also used so that the ideal consistency level handler when constructed knows it is the ideal
//one for tracking purposes
- if (idealConsistencyLevel == replicaLayout.consistencyLevel)
+ if (idealConsistencyLevel == replicaPlan.consistencyLevel())
{
resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler);
}
else
{
//Construct a delegate response handler to use to track the ideal consistency level
- AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaLayout.withConsistencyLevel(idealConsistencyLevel),
+ AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaPlan.withConsistencyLevel(idealConsistencyLevel),
callback,
writeType,
queryStartNanoTime,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/Endpoints.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Endpoints.java b/src/java/org/apache/cassandra/locator/Endpoints.java
index 3d5faa4..28e578c 100644
--- a/src/java/org/apache/cassandra/locator/Endpoints.java
+++ b/src/java/org/apache/cassandra/locator/Endpoints.java
@@ -29,6 +29,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
+/**
+ * A collection of Endpoints for a given ring position. This will typically reside in a ReplicaLayout,
+ * representing some subset of the endpoints for the Token or Range
+ * @param <E> The concrete type of Endpoints, that will be returned by the modifying methods
+ */
public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaCollection<E>
{
static final Map<InetAddressAndPort, Replica> EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>());
@@ -89,17 +94,32 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC
return filter(r -> !self.equals(r.endpoint()));
}
+ public Replica selfIfPresent()
+ {
+ InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
+ return byEndpoint().get(self);
+ }
+
+ /**
+ * @return a collection without the provided endpoints, otherwise in the same order as this collection
+ */
public E without(Set<InetAddressAndPort> remove)
{
return filter(r -> !remove.contains(r.endpoint()));
}
+ /**
+ * @return a collection with only the provided endpoints (ignoring any not present), otherwise in the same order as this collection
+ */
public E keep(Set<InetAddressAndPort> keep)
{
return filter(r -> keep.contains(r.endpoint()));
}
- public E keep(Iterable<InetAddressAndPort> endpoints)
+ /**
+ * @return a collection containing the Replica from this collection for the provided endpoints, in the order of the provided endpoints
+ */
+ public E select(Iterable<InetAddressAndPort> endpoints, boolean ignoreMissing)
{
ReplicaCollection.Mutable<E> copy = newMutable(
endpoints instanceof Collection<?>
@@ -109,10 +129,14 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC
Map<InetAddressAndPort, Replica> byEndpoint = byEndpoint();
for (InetAddressAndPort endpoint : endpoints)
{
- Replica keep = byEndpoint.get(endpoint);
- if (keep == null)
+ Replica select = byEndpoint.get(endpoint);
+ if (select == null)
+ {
+ if (!ignoreMissing)
+ throw new IllegalArgumentException(endpoint + " is not present in " + this);
continue;
- copy.add(keep, ReplicaCollection.Mutable.Conflict.DUPLICATE);
+ }
+ copy.add(select, ReplicaCollection.Mutable.Conflict.DUPLICATE);
}
return copy.asSnapshot();
}
@@ -124,34 +148,19 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC
* 2) because a movement that changes the type of replication from transient to full must be handled
* differently for reads and writes (with the reader treating it as transient, and writer as full)
*
- * The method haveConflicts() below, and resolveConflictsInX, are used to detect and resolve any issues
+ * The method {@link ReplicaLayout#haveWriteConflicts} can be used to detect and resolve any issues
*/
public static <E extends Endpoints<E>> E concat(E natural, E pending)
{
return AbstractReplicaCollection.concat(natural, pending, Conflict.NONE);
}
- public static <E extends Endpoints<E>> boolean haveConflicts(E natural, E pending)
- {
- Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
- for (InetAddressAndPort pendingEndpoint : pending.endpoints())
- {
- if (naturalEndpoints.contains(pendingEndpoint))
- return true;
- }
- return false;
- }
-
- // must apply first
- public static <E extends Endpoints<E>> E resolveConflictsInNatural(E natural, E pending)
- {
- return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true));
- }
-
- // must apply second
- public static <E extends Endpoints<E>> E resolveConflictsInPending(E natural, E pending)
+ public static <E extends Endpoints<E>> E append(E replicas, Replica extraReplica)
{
- return pending.without(natural.endpoints());
+ Mutable<E> mutable = replicas.newMutable(replicas.size() + 1);
+ mutable.addAll(replicas);
+ mutable.add(extraReplica, Conflict.NONE);
+ return mutable.asSnapshot();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/EndpointsForRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForRange.java b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
index c2d8232..f812951 100644
--- a/src/java/org/apache/cassandra/locator/EndpointsForRange.java
+++ b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
@@ -86,7 +86,7 @@ public class EndpointsForRange extends Endpoints<EndpointsForRange>
{
boolean hasSnapshot;
public Mutable(Range<Token> range) { this(range, 0); }
- public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+ public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
public void add(Replica replica, Conflict ignoreConflict)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/EndpointsForToken.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForToken.java b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
index f24c615..3446dc9 100644
--- a/src/java/org/apache/cassandra/locator/EndpointsForToken.java
+++ b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
@@ -77,7 +77,7 @@ public class EndpointsForToken extends Endpoints<EndpointsForToken>
{
boolean hasSnapshot;
public Mutable(Token token) { this(token, 0); }
- public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+ public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
public void add(Replica replica, Conflict ignoreConflict)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index 74828ad..1773173 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -173,7 +173,7 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint
{
boolean hasSnapshot;
public Mutable(InetAddressAndPort endpoint) { this(endpoint, 0); }
- public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+ public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
public void add(Replica replica, Conflict ignoreConflict)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
index 6833f4b..d1006dc 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
@@ -62,6 +62,11 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera
public abstract boolean contains(Replica replica);
/**
+ * @return the number of replicas that match the predicate
+ */
+ public abstract int count(Predicate<Replica> predicate);
+
+ /**
* @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate.
* An effort will be made to either return ourself, or a subList, where possible.
* It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
@@ -108,16 +113,30 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera
C asImmutableView();
/**
- * @return an Immutable clone that assumes this Mutable will never be modified again.
- * If this is not true, behaviour is undefined.
+ * @return an Immutable clone that assumes this Mutable will never be modified again,
+ * so its contents can be reused.
+ *
+ * This Mutable should enforce that it is no longer modified.
*/
C asSnapshot();
- enum Conflict { NONE, DUPLICATE, ALL}
+ /**
+ * Passed to add() and addAll() as ignoreConflicts parameter. The meaning of conflict varies by collection type
+ * (for Endpoints, it is a duplicate InetAddressAndPort; for RangesAtEndpoint it is a duplicate Range).
+ */
+ enum Conflict
+ {
+ /** fail on addition of any such conflict */
+ NONE,
+ /** fail on addition of any such conflict where the contents differ (first occurrence and position wins) */
+ DUPLICATE,
+ /** ignore all conflicts (the first occurrence and position wins) */
+ ALL
+ }
/**
* @param replica add this replica to the end of the collection
- * @param ignoreConflict if false, fail on any conflicting additions (as defined by C's semantics)
+ * @param ignoreConflict conflicts to ignore, see {@link Conflict}
*/
void add(Replica replica, Conflict ignoreConflict);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
index 946a7f8..f48c989 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -18,364 +18,299 @@
package org.apache.cassandra.locator;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Predicate;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
-import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
import org.apache.cassandra.utils.FBUtilities;
-import static com.google.common.collect.Iterables.any;
+import java.util.Set;
+import java.util.function.Predicate;
/**
- * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors
- * for building the relevant layout.
+ * The relevant replicas for an operation over a given range or token.
*
- * Constitutes:
- * - the 'natural' replicas replicating the range or token relevant for the operation
- * - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates
- * - the 'selected' replicas, those that should be targeted for any operation
- * - 'all' replicas represents natural+pending
- *
- * @param <E> the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange)
- * @param <L> the type of itself, including its type parameters, for return type of modifying methods
+ * @param <E>
*/
-public abstract class ReplicaLayout<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+public abstract class ReplicaLayout<E extends Endpoints<E>>
{
- private volatile E all;
- protected final E natural;
- protected final E pending;
- protected final E selected;
-
- protected final Keyspace keyspace;
- protected final ConsistencyLevel consistencyLevel;
-
- private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected)
- {
- this(keyspace, consistencyLevel, natural, pending, selected, null);
- }
+ private final E natural;
- private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all)
+ ReplicaLayout(E natural)
{
- assert selected != null;
- assert pending == null || !Endpoints.haveConflicts(natural, pending);
- this.keyspace = keyspace;
- this.consistencyLevel = consistencyLevel;
this.natural = natural;
- this.pending = pending;
- this.selected = selected;
- // if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural
- if (all == null && pending == null)
- all = natural;
- this.all = all;
}
- public Replica getReplicaFor(InetAddressAndPort endpoint)
- {
- return natural.byEndpoint().get(endpoint);
- }
-
- public E natural()
+ /**
+ * The 'natural' owners of the ring position(s), as implied by the current ring layout.
+ * This excludes any pending owners, i.e. those that are in the process of taking ownership of a range, but
+ * have not yet finished obtaining their view of the range.
+ */
+ public final E natural()
{
return natural;
}
- public E all()
- {
- E result = all;
- if (result == null)
- all = result = Endpoints.concat(natural, pending);
- return result;
- }
-
- public E selected()
- {
- return selected;
- }
-
/**
- * @return the pending replicas - will be null for read layouts
- * TODO: ideally we would enforce at compile time that read layouts have no pending to access
+ * All relevant owners of the ring position(s) for this operation, as implied by the current ring layout.
+ * For writes, this will include pending owners, and for reads it will be equivalent to natural()
*/
- public E pending()
- {
- return pending;
- }
-
- public int blockFor()
+ public E all()
{
- return pending == null
- ? consistencyLevel.blockFor(keyspace)
- : consistencyLevel.blockForWrite(keyspace, pending);
+ return natural;
}
- public Keyspace keyspace()
+ public String toString()
{
- return keyspace;
+ return "ReplicaLayout [ natural: " + natural + " ]";
}
- public ConsistencyLevel consistencyLevel()
+ public static class ForTokenRead extends ReplicaLayout<EndpointsForToken> implements ForToken
{
- return consistencyLevel;
- }
-
- abstract public L withSelected(E replicas);
-
- abstract public L withConsistencyLevel(ConsistencyLevel cl);
-
- public L forNaturalUncontacted()
- {
- E more;
- if (consistencyLevel.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+ public ForTokenRead(EndpointsForToken natural)
{
- IEndpointSnitch snitch = keyspace.getReplicationStrategy().snitch;
- String localDC = DatabaseDescriptor.getLocalDataCenter();
+ super(natural);
+ }
- more = natural.filter(replica -> !selected.contains(replica) &&
- snitch.getDatacenter(replica).equals(localDC));
- } else
+ @Override
+ public Token token()
{
- more = natural.filter(replica -> !selected.contains(replica));
+ return natural().token();
}
- return withSelected(more);
+ public ReplicaLayout.ForTokenRead filter(Predicate<Replica> filter)
+ {
+ EndpointsForToken filtered = natural().filter(filter);
+ // AbstractReplicaCollection.filter returns itself if all elements match the filter
+ if (filtered == natural()) return this;
+ return new ReplicaLayout.ForTokenRead(filtered);
+ }
}
- public static class ForRange extends ReplicaLayout<EndpointsForRange, ForRange>
+ public static class ForRangeRead extends ReplicaLayout<EndpointsForRange> implements ForRange
{
- public final AbstractBounds<PartitionPosition> range;
+ final AbstractBounds<PartitionPosition> range;
- @VisibleForTesting
- public ForRange(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
+ public ForRangeRead(AbstractBounds<PartitionPosition> range, EndpointsForRange natural)
{
- // Range queries do not contact pending replicas
- super(keyspace, consistencyLevel, natural, null, selected);
+ super(natural);
this.range = range;
}
@Override
- public ForRange withSelected(EndpointsForRange newSelected)
+ public AbstractBounds<PartitionPosition> range()
{
- return new ForRange(keyspace, consistencyLevel, range, natural, newSelected);
+ return range;
}
- @Override
- public ForRange withConsistencyLevel(ConsistencyLevel cl)
+ public ReplicaLayout.ForRangeRead filter(Predicate<Replica> filter)
{
- return new ForRange(keyspace, cl, range, natural, selected);
+ EndpointsForRange filtered = natural().filter(filter);
+ // AbstractReplicaCollection.filter returns itself if all elements match the filter
+ if (filtered == natural()) return this;
+ return new ReplicaLayout.ForRangeRead(range(), filtered);
}
}
- public static class ForToken extends ReplicaLayout<EndpointsForToken, ForToken>
+ public static class ForWrite<E extends Endpoints<E>> extends ReplicaLayout<E>
{
- public final Token token;
+ final E all;
+ final E pending;
- @VisibleForTesting
- public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected)
+ ForWrite(E natural, E pending, E all)
{
- super(keyspace, consistencyLevel, natural, pending, selected);
- this.token = token;
+ super(natural);
+ assert pending != null && !haveWriteConflicts(natural, pending);
+ if (all == null)
+ all = Endpoints.concat(natural, pending);
+ this.all = all;
+ this.pending = pending;
}
- public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+ public final E all()
{
- super(keyspace, consistencyLevel, natural, pending, selected, all);
- this.token = token;
+ return all;
}
- public ForToken withSelected(EndpointsForToken newSelected)
+ public final E pending()
{
- return new ForToken(keyspace, consistencyLevel, token, natural, pending, newSelected);
+ return pending;
}
- @Override
- public ForToken withConsistencyLevel(ConsistencyLevel cl)
+ public String toString()
{
- return new ForToken(keyspace, cl, token, natural, pending, selected);
+ return "ReplicaLayout [ natural: " + natural() + ", pending: " + pending + " ]";
}
}
- public static class ForPaxos extends ForToken
+ public static class ForTokenWrite extends ForWrite<EndpointsForToken> implements ForToken
{
- private final int requiredParticipants;
-
- private ForPaxos(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int requiredParticipants, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+ public ForTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
{
- super(keyspace, consistencyLevel, token, natural, pending, selected, all);
- this.requiredParticipants = requiredParticipants;
+ this(natural, pending, null);
}
-
- public int getRequiredParticipants()
+ public ForTokenWrite(EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken all)
{
- return requiredParticipants;
+ super(natural, pending, all);
}
- }
- public static ForToken forSingleReplica(Keyspace keyspace, Token token, Replica replica)
- {
- EndpointsForToken singleReplica = EndpointsForToken.of(token, replica);
- return new ForToken(keyspace, ConsistencyLevel.ONE, token, singleReplica, EndpointsForToken.empty(token), singleReplica, singleReplica);
- }
-
- public static ForRange forSingleReplica(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
- {
- EndpointsForRange singleReplica = EndpointsForRange.of(replica);
- return new ForRange(keyspace, ConsistencyLevel.ONE, range, singleReplica, singleReplica);
- }
+ @Override
+ public Token token() { return natural().token(); }
- public static ForToken forCounterWrite(Keyspace keyspace, Token token, Replica replica)
- {
- return forSingleReplica(keyspace, token, replica);
+ public ReplicaLayout.ForTokenWrite filter(Predicate<Replica> filter)
+ {
+ EndpointsForToken filtered = all().filter(filter);
+ // AbstractReplicaCollection.filter returns itself if all elements match the filter
+ if (filtered == all()) return this;
+ // unique by endpoint, so can for efficiency filter only on endpoint
+ return new ReplicaLayout.ForTokenWrite(
+ natural().keep(filtered.endpoints()),
+ pending().keep(filtered.endpoints()),
+ filtered
+ );
+ }
}
- public static ForToken forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+ public interface ForRange
{
- // A single case we write not for range or token, but multiple mutations to many tokens
- Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
- EndpointsForToken natural = EndpointsForToken.copyOf(token, SystemReplicas.getSystemReplicas(endpoints));
- EndpointsForToken pending = EndpointsForToken.empty(token);
- ConsistencyLevel consistencyLevel = natural.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
-
- return forWriteWithDownNodes(keyspace, consistencyLevel, token, natural, pending);
+ public AbstractBounds<PartitionPosition> range();
}
- public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token) throws UnavailableException
+ public interface ForToken
{
- return forWrite(keyspace, consistencyLevel, token, Predicates.alwaysTrue());
+ public Token token();
}
- public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+ /**
+ * Gets the 'natural' and 'pending' replicas that own a given token, with no filtering or processing.
+ *
+ * Since a write is intended for all nodes (except, unless necessary, transient replicas), this method's
+ * only responsibility is to fetch the 'natural' and 'pending' replicas, then resolve any conflicts
+ * {@link ReplicaLayout#haveWriteConflicts(Endpoints, Endpoints)}
+ */
+ public static ReplicaLayout.ForTokenWrite forTokenWriteLiveAndDown(Keyspace keyspace, Token token)
{
- EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), token);
+ // TODO: race condition to fetch these. implications??
+ EndpointsForToken natural = keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspace.getName());
- return forWrite(keyspace, consistencyLevel, token, natural, pending, isAlive);
- }
-
- public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending) throws UnavailableException
- {
- return forWrite(keyspace, consistencyLevel, token, natural, pending, Predicates.alwaysTrue());
+ return forTokenWrite(natural, pending);
}
- public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+ public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
{
- if (Endpoints.haveConflicts(natural, pending))
+ if (haveWriteConflicts(natural, pending))
{
- natural = Endpoints.resolveConflictsInNatural(natural, pending);
- pending = Endpoints.resolveConflictsInPending(natural, pending);
+ natural = resolveWriteConflictsInNatural(natural, pending);
+ pending = resolveWriteConflictsInPending(natural, pending);
}
-
- if (!any(natural, Replica::isTransient) && !any(pending, Replica::isTransient))
- {
- EndpointsForToken selected = Endpoints.concat(natural, pending).filter(r -> isAlive.test(r.endpoint()));
- return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected);
- }
-
- return forWrite(keyspace, consistencyLevel, token, consistencyLevel.blockForWrite(keyspace, pending), natural, pending, isAlive);
+ return new ReplicaLayout.ForTokenWrite(natural, pending);
}
- public static ReplicaLayout.ForPaxos forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
+ /**
+ * Detect if we have any endpoint in both pending and full; this can occur either due to races (there is no isolation)
+ * or because an endpoint is transitioning between full and transient replication status.
+ *
+ * We essentially always prefer the full version for writes, because this is stricter.
+ *
+ * For transient->full transitions:
+ *
+ * Since we always write to any pending transient replica, effectively upgrading it to full for the transition duration,
+ * it might at first seem to be OK to continue treating the conflict replica as its 'natural' transient form,
+ * as there is always a quorum of nodes receiving the write. However, ring ownership changes are not atomic or
+ * consistent across the cluster, and it is possible for writers to see different ring states.
+ *
+ * Furthermore, an operator would expect that the full node has received all writes, with no extra need for repair
+ * (as the normal contract dictates) when it completes its transition.
+ *
+ * While we cannot completely eliminate risks due to ring inconsistencies, this approach is the most conservative
+ * available to us today to mitigate, and (we think) the easiest to reason about.
+ *
+ * For full->transient transitions:
+ *
+ * In this case, things are dicier, because in theory we can trigger this change instantly. All we need to do is
+ * drop some data, surely?
+ *
+ * Ring movements can put us in a pickle; any other node could believe us to be full when we have become transient,
+ * and perform a full data request to us that we believe ourselves capable of answering, but that we are not.
+ * If the ring is inconsistent, it's even feasible that a transient request would be made to the node that is losing
+ * its transient status, that also does not know it has yet done so, resulting in all involved nodes being unaware
+ * of the data inconsistency.
+ *
+ * This happens because ring ownership changes are implied by a single node; not all owning nodes get a say in when
+ * the transition takes effect. As such, a node can hold an incorrect belief about its own ownership ranges.
+ *
+ * This race condition is somewhat inherent in present day Cassandra, and there's actually a limit to what we can do about it.
+ * It is a little more dangerous with transient replication, however, because we can completely answer a request without
+ * ever touching a digest, meaning we are less likely to attempt to repair any inconsistency.
+ *
+ * We aren't guaranteed to contact any different nodes for the data requests, of course, though we at least have a chance.
+ *
+ * Note: If we have any pending transient->full movement, we need to move the full replica to our 'natural' bucket
+ * to avoid corrupting our count. This is fine for writes, all we're doing is ensuring we always write to the node,
+ * instead of selectively.
+ *
+ * @param natural
+ * @param pending
+ * @param <E>
+ * @return
+ */
+ static <E extends Endpoints<E>> boolean haveWriteConflicts(E natural, E pending)
{
- Token tk = key.getToken();
- EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), tk);
- EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, keyspace.getName());
- if (Endpoints.haveConflicts(natural, pending))
+ Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
+ for (InetAddressAndPort pendingEndpoint : pending.endpoints())
{
- natural = Endpoints.resolveConflictsInNatural(natural, pending);
- pending = Endpoints.resolveConflictsInPending(natural, pending);
+ if (naturalEndpoints.contains(pendingEndpoint))
+ return true;
}
-
- // TODO CASSANDRA-14547
- Replicas.temporaryAssertFull(natural);
- Replicas.temporaryAssertFull(pending);
-
- if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
- {
- // Restrict natural and pending to node in the local DC only
- String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
- IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica));
-
- natural = natural.filter(isLocalDc);
- pending = pending.filter(isLocalDc);
- }
-
- int participants = pending.size() + natural.size();
- int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
-
- EndpointsForToken all = Endpoints.concat(natural, pending);
- EndpointsForToken selected = all.filter(IAsyncCallback.isReplicaAlive);
- if (selected.size() < requiredParticipants)
- throw UnavailableException.create(consistencyForPaxos, requiredParticipants, selected.size());
-
- // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
- // Note that we fake an impossible number of required nodes in the unavailable exception
- // to nail home the point that it's an impossible operation no matter how many nodes are live.
- if (pending.size() > 1)
- throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", pending.size()),
- consistencyForPaxos,
- participants + 1,
- selected.size());
-
- return new ReplicaLayout.ForPaxos(keyspace, consistencyForPaxos, key.getToken(), requiredParticipants, natural, pending, selected, all);
+ return false;
}
/**
- * We want to send mutations to as many full replicas as we can, and just as many transient replicas
- * as we need to meet blockFor.
+ * MUST APPLY FIRST
+ * See {@link ReplicaLayout#haveWriteConflicts}
+ * @return a 'natural' replica collection, that has had its conflicts with pending repaired
*/
- @VisibleForTesting
- public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int blockFor, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> livePredicate) throws UnavailableException
+ private static <E extends Endpoints<E>> E resolveWriteConflictsInNatural(E natural, E pending)
{
- EndpointsForToken all = Endpoints.concat(natural, pending);
- EndpointsForToken selected = all
- .select()
- .add(r -> r.isFull() && livePredicate.test(r.endpoint()))
- .add(r -> r.isTransient() && livePredicate.test(r.endpoint()), blockFor)
- .get();
-
- consistencyLevel.assureSufficientLiveNodesForWrite(keyspace, selected, pending);
-
- return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected, all);
+ return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true));
}
- public static ForToken forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+ /**
+ * MUST APPLY SECOND
+ * See {@link ReplicaLayout#haveWriteConflicts}
+ * @return a 'pending' replica collection, that has had its conflicts with natural repaired
+ */
+ private static <E extends Endpoints<E>> E resolveWriteConflictsInPending(E natural, E pending)
{
- EndpointsForToken natural = StorageProxy.getLiveSortedReplicasForToken(keyspace, token);
- EndpointsForToken selected = consistencyLevel.filterForQuery(keyspace, natural, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
-
- // Throw UAE early if we don't have enough replicas.
- consistencyLevel.assureSufficientLiveNodesForRead(keyspace, selected);
-
- return new ForToken(keyspace, consistencyLevel, token, natural, null, selected);
+ return pending.without(natural.endpoints());
}
- public static ForRange forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
+ /**
+ * @return the read layout for a token - this includes only live natural replicas, i.e. those that are not pending
+ * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch
+ */
+ static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(Keyspace keyspace, Token token)
{
- return new ForRange(keyspace, consistencyLevel, range, natural, selected);
+ EndpointsForToken replicas = keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
+ replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
+ replicas = replicas.filter(FailureDetector.isReplicaAlive);
+ return new ReplicaLayout.ForTokenRead(replicas);
}
- public String toString()
+ /**
+ * TODO: we should really double check that the provided range does not overlap multiple token ring regions
+ * @return the read layout for a range - this includes only live natural replicas, i.e. those that are not pending
+ * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch
+ */
+ static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(Keyspace keyspace, AbstractBounds<PartitionPosition> range)
{
- return "ReplicaLayout [ CL: " + consistencyLevel + " keyspace: " + keyspace + " natural: " + natural + "pending: " + pending + " selected: " + selected + " ]";
+ EndpointsForRange replicas = keyspace.getReplicationStrategy().getNaturalReplicas(range.right);
+ replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
+ replicas = replicas.filter(FailureDetector.isReplicaAlive);
+ return new ReplicaLayout.ForRangeRead(range, replicas);
}
-}
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
new file mode 100644
index 0000000..4d6127b
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+
+import java.util.function.Predicate;
+
+public abstract class ReplicaPlan<E extends Endpoints<E>>
+{
+ protected final Keyspace keyspace;
+ protected final ConsistencyLevel consistencyLevel;
+
+ // all nodes we will contact via any mechanism, including hints
+ // i.e., for:
+ // - reads, only live natural replicas
+ // ==> live.natural().subList(0, blockFor + initial speculate)
+ // - writes, includes all full, and any pending replicas, (and only any necessary transient ones to make up the difference)
+ // ==> liveAndDown.natural().filter(isFull) ++ liveAndDown.pending() ++ live.natural.filter(isTransient, req)
+ // - paxos, includes all live replicas (natural+pending), for this DC if SERIAL_LOCAL
+ // ==> live.all() (if consistencyLevel.isDCLocal(), then .filter(consistencyLevel.isLocal))
+ private final E contacts;
+
+ ReplicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, E contacts)
+ {
+ assert contacts != null;
+ this.keyspace = keyspace;
+ this.consistencyLevel = consistencyLevel;
+ this.contacts = contacts;
+ }
+
+ public abstract int blockFor();
+ public abstract void assureSufficientReplicas();
+
+ public E contacts() { return contacts; }
+ public boolean contacts(Replica replica) { return contacts.contains(replica); }
+ public Keyspace keyspace() { return keyspace; }
+ public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
+
+ public static abstract class ForRead<E extends Endpoints<E>> extends ReplicaPlan<E>
+ {
+ // all nodes we *could* contacts; typically all natural replicas that are believed to be alive
+ // we will consult this collection to find uncontacted nodes we might contact if we doubt we will meet consistency level
+ private final E candidates;
+
+ ForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, E candidates, E contact)
+ {
+ super(keyspace, consistencyLevel, contact);
+ this.candidates = candidates;
+ }
+
+ public int blockFor() { return consistencyLevel.blockFor(keyspace); }
+ public void assureSufficientReplicas() { consistencyLevel.assureSufficientLiveReplicasForRead(keyspace, candidates()); }
+
+ public E candidates() { return candidates; }
+
+ public E uncontactedCandidates()
+ {
+ return candidates().filter(r -> !contacts(r));
+ }
+
+ public Replica firstUncontactedCandidate(Predicate<Replica> extraPredicate)
+ {
+ return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) && !contacts(r)).orNull();
+ }
+
+ public Replica getReplicaFor(InetAddressAndPort endpoint)
+ {
+ return candidates().byEndpoint().get(endpoint);
+ }
+
+ public String toString()
+ {
+ return "ReplicaPlan.ForRead [ CL: " + consistencyLevel + " keyspace: " + keyspace + " candidates: " + candidates + " contacts: " + contacts() + " ]";
+ }
+ }
+
+ public static class ForTokenRead extends ForRead<EndpointsForToken>
+ {
+ public ForTokenRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken candidates, EndpointsForToken contact)
+ {
+ super(keyspace, consistencyLevel, candidates, contact);
+ }
+
+ ForTokenRead withContact(EndpointsForToken newContact)
+ {
+ return new ForTokenRead(keyspace, consistencyLevel, candidates(), newContact);
+ }
+ }
+
+ public static class ForRangeRead extends ForRead<EndpointsForRange>
+ {
+ final AbstractBounds<PartitionPosition> range;
+
+ public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact)
+ {
+ super(keyspace, consistencyLevel, candidates, contact);
+ this.range = range;
+ }
+
+ public AbstractBounds<PartitionPosition> range() { return range; }
+
+ ForRangeRead withContact(EndpointsForRange newContact)
+ {
+ return new ForRangeRead(keyspace, consistencyLevel, range, candidates(), newContact);
+ }
+ }
+
+ public static abstract class ForWrite<E extends Endpoints<E>> extends ReplicaPlan<E>
+ {
+ // TODO: this is only needed because of poor isolation of concerns elsewhere - we can remove it soon, and will do so in a follow-up patch
+ final E pending;
+ final E liveAndDown;
+ final E live;
+
+ ForWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, E pending, E liveAndDown, E live, E contact)
+ {
+ super(keyspace, consistencyLevel, contact);
+ this.pending = pending;
+ this.liveAndDown = liveAndDown;
+ this.live = live;
+ }
+
+ public int blockFor() { return consistencyLevel.blockForWrite(keyspace, pending()); }
+ public void assureSufficientReplicas() { consistencyLevel.assureSufficientLiveReplicasForWrite(keyspace, live(), pending()); }
+
+ /** Replicas that a region of the ring is moving to; not yet ready to serve reads, but should receive writes */
+ public E pending() { return pending; }
+ /** Replicas that can participate in the write - this always includes all nodes (pending and natural) in all DCs, except for paxos LOCAL_QUORUM (which is local DC only) */
+ public E liveAndDown() { return liveAndDown; }
+ /** The live replicas present in liveAndDown, usually derived from FailureDetector.isReplicaAlive */
+ public E live() { return live; }
+ /** Calculate which live endpoints we could have contacted, but chose not to */
+ public E liveUncontacted() { return live().filter(r -> !contacts(r)); }
+ /** Test liveness, consistent with the upfront analysis done for this operation (i.e. test membership of live()) */
+ public boolean isAlive(Replica replica) { return live.endpoints().contains(replica.endpoint()); }
+
+ public String toString()
+ {
+ return "ReplicaPlan.ForWrite [ CL: " + consistencyLevel + " keyspace: " + keyspace + " liveAndDown: " + liveAndDown + " live: " + live + " contacts: " + contacts() + " ]";
+ }
+ }
+
+ public static class ForTokenWrite extends ForWrite<EndpointsForToken>
+ {
+ public ForTokenWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact)
+ {
+ super(keyspace, consistencyLevel, pending, liveAndDown, live, contact);
+ }
+
+ private ReplicaPlan.ForTokenWrite copy(ConsistencyLevel newConsistencyLevel, EndpointsForToken newContact)
+ {
+ return new ReplicaPlan.ForTokenWrite(keyspace, newConsistencyLevel, pending(), liveAndDown(), live(), newContact);
+ }
+
+ ForTokenWrite withConsistencyLevel(ConsistencyLevel newConsistencylevel) { return copy(newConsistencylevel, contacts()); }
+ public ForTokenWrite withContact(EndpointsForToken newContact) { return copy(consistencyLevel, newContact); }
+ }
+
+ public static class ForPaxosWrite extends ForWrite<EndpointsForToken>
+ {
+ final int requiredParticipants;
+
+ ForPaxosWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact, int requiredParticipants)
+ {
+ super(keyspace, consistencyLevel, pending, liveAndDown, live, contact);
+ this.requiredParticipants = requiredParticipants;
+ }
+
+ public int requiredParticipants() { return requiredParticipants; }
+ }
+
+ /**
+ * Used by AbstractReadExecutor, {Data,Digest}Resolver and ReadRepair to share a ReplicaPlan whose 'contacts' replicas
+ * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write)
+ *
+ * The internal reference is not volatile, despite being shared between threads. The initial reference provided to
+ * the constructor should be visible by the normal process of sharing data between threads (i.e. executors, etc)
+ * and any updates will either be seen or not seen, perhaps not promptly, but certainly not incompletely.
+ * The contained ReplicaPlan has only final member properties, so it cannot be seen partially initialised.
+ */
+ public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>>
+ {
+ /**
+ * add the provided replica to this shared plan, by updating the internal reference
+ */
+ public void addToContacts(Replica replica);
+ /**
+ * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised
+ */
+ public P get();
+ /**
+ * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised,
+ * but replace its 'contacts' with those provided
+ */
+ public abstract P getWithContacts(E endpoints);
+ }
+
+ public static class SharedForTokenRead implements Shared<EndpointsForToken, ForTokenRead>
+ {
+ private ForTokenRead replicaPlan;
+ SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; }
+ public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); }
+ public ForTokenRead get() { return replicaPlan; }
+ public ForTokenRead getWithContacts(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); }
+ }
+
+ public static class SharedForRangeRead implements Shared<EndpointsForRange, ForRangeRead>
+ {
+ private ForRangeRead replicaPlan;
+ SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; }
+ public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); }
+ public ForRangeRead get() { return replicaPlan; }
+ public ForRangeRead getWithContacts(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); }
+ }
+
+ public static SharedForTokenRead shared(ForTokenRead replicaPlan) { return new SharedForTokenRead(replicaPlan); }
+ public static SharedForRangeRead shared(ForRangeRead replicaPlan) { return new SharedForRangeRead(replicaPlan); }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/4] cassandra git commit: ReplicaPlan/Layout refactor
follow-up/completion
Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 75885ae..c296cba 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -21,8 +21,9 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
-import org.apache.cassandra.locator.ReplicaLayout;
-
+import com.google.common.base.Predicates;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,6 @@ import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import static com.google.common.collect.Iterables.all;
-import static com.google.common.collect.Iterables.tryFind;
/**
* Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
@@ -65,24 +65,25 @@ public abstract class AbstractReadExecutor
private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
protected final ReadCommand command;
- private final ReplicaLayout.ForToken replicaLayout;
- protected final ReadRepair<EndpointsForToken, ReplicaLayout.ForToken> readRepair;
- protected final DigestResolver<EndpointsForToken, ReplicaLayout.ForToken> digestResolver;
- protected final ReadCallback<EndpointsForToken, ReplicaLayout.ForToken> handler;
+ private final ReplicaPlan.SharedForTokenRead replicaPlan;
+ protected final ReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead> readRepair;
+ protected final DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> digestResolver;
+ protected final ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> handler;
protected final TraceState traceState;
protected final ColumnFamilyStore cfs;
protected final long queryStartNanoTime;
private final int initialDataRequestCount;
protected volatile PartitionIterator result = null;
- AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, int initialDataRequestCount, long queryStartNanoTime)
+ AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, int initialDataRequestCount, long queryStartNanoTime)
{
this.command = command;
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = ReplicaPlan.shared(replicaPlan);
this.initialDataRequestCount = initialDataRequestCount;
- this.readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
- this.digestResolver = new DigestResolver<>(command, replicaLayout, queryStartNanoTime);
- this.handler = new ReadCallback<>(digestResolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), command, replicaLayout, queryStartNanoTime);
+ // the ReadRepair and DigestResolver both need to see our updated
+ this.readRepair = ReadRepair.create(command, this.replicaPlan, queryStartNanoTime);
+ this.digestResolver = new DigestResolver<>(command, this.replicaPlan, queryStartNanoTime);
+ this.handler = new ReadCallback<>(digestResolver, command, this.replicaPlan, queryStartNanoTime);
this.cfs = cfs;
this.traceState = Tracing.instance.get();
this.queryStartNanoTime = queryStartNanoTime;
@@ -93,7 +94,7 @@ public abstract class AbstractReadExecutor
// TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
// we stop being compatible with pre-3.0 nodes.
int digestVersion = MessagingService.current_version;
- for (Replica replica : replicaLayout.selected())
+ for (Replica replica : replicaPlan.contacts())
digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica.endpoint()));
command.setDigestVersion(digestVersion);
}
@@ -168,7 +169,7 @@ public abstract class AbstractReadExecutor
*/
public void executeAsync()
{
- EndpointsForToken selected = replicaLayout().selected();
+ EndpointsForToken selected = replicaPlan().contacts();
EndpointsForToken fullDataRequests = selected.filter(Replica::isFull, initialDataRequestCount);
makeFullDataRequests(fullDataRequests);
makeTransientDataRequests(selected.filter(Replica::isTransient));
@@ -184,30 +185,25 @@ public abstract class AbstractReadExecutor
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
- // Endpoints for Token
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
+ ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
// Speculative retry is disabled *OR*
// 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
- // TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0
- return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false);
+ return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false);
// There are simply no extra replicas to speculate.
// Handle this separately so it can record failed attempts to speculate due to lack of replicas
- if (replicaLayout.selected().size() == replicaLayout.all().size())
+ if (replicaPlan.contacts().size() == replicaPlan.candidates().size())
{
boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL;
- return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation);
+ return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation);
}
- // If CL.ALL, upgrade to AlwaysSpeculating;
- // If We are going to contact every node anyway, ask for 2 full data requests instead of 1, for redundancy
- // (same amount of requests in total, but we turn 1 digest request into a full blown data request)
if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
- return new AlwaysSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime);
+ return new AlwaysSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
else // PERCENTILE or CUSTOM.
- return new SpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime);
+ return new SpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
}
/**
@@ -223,9 +219,9 @@ public abstract class AbstractReadExecutor
return !handler.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS);
}
- ReplicaLayout.ForToken replicaLayout()
+ ReplicaPlan.ForTokenRead replicaPlan()
{
- return replicaLayout;
+ return replicaPlan.get();
}
void onReadTimeout() {}
@@ -239,9 +235,9 @@ public abstract class AbstractReadExecutor
*/
private final boolean logFailedSpeculation;
- public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, long queryStartNanoTime, boolean logFailedSpeculation)
+ public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, long queryStartNanoTime, boolean logFailedSpeculation)
{
- super(cfs, command, replicaLayout, 1, queryStartNanoTime);
+ super(cfs, command, replicaPlan, 1, queryStartNanoTime);
this.logFailedSpeculation = logFailedSpeculation;
}
@@ -260,13 +256,13 @@ public abstract class AbstractReadExecutor
public SpeculatingReadExecutor(ColumnFamilyStore cfs,
ReadCommand command,
- ReplicaLayout.ForToken replicaLayout,
+ ReplicaPlan.ForTokenRead replicaPlan,
long queryStartNanoTime)
{
// We're hitting additional targets for read repair (??). Since our "extra" replica is the least-
// preferred by the snitch, we do an extra data read to start with against a replica more
// likely to reply; better to let RR fail than the entire query.
- super(cfs, command, replicaLayout, replicaLayout.blockFor() < replicaLayout.selected().size() ? 2 : 1, queryStartNanoTime);
+ super(cfs, command, replicaPlan, replicaPlan.blockFor() < replicaPlan.contacts().size() ? 2 : 1, queryStartNanoTime);
}
public void maybeTryAdditionalReplicas()
@@ -277,12 +273,12 @@ public abstract class AbstractReadExecutor
cfs.metric.speculativeRetries.inc();
speculated = true;
+ ReplicaPlan.ForTokenRead replicaPlan = replicaPlan();
ReadCommand retryCommand = command;
Replica extraReplica;
if (handler.resolver.isDataPresent())
{
- extraReplica = tryFind(replicaLayout().all(),
- r -> !replicaLayout().selected().contains(r)).orNull();
+ extraReplica = replicaPlan.firstUncontactedCandidate(Predicates.alwaysTrue());
// we should only use a SpeculatingReadExecutor if we have an extra replica to speculate against
assert extraReplica != null;
@@ -293,8 +289,7 @@ public abstract class AbstractReadExecutor
}
else
{
- extraReplica = tryFind(replicaLayout().all(),
- r -> r.isFull() && !replicaLayout().selected().contains(r)).orNull();
+ extraReplica = replicaPlan.firstUncontactedCandidate(Replica::isFull);
if (extraReplica == null)
{
cfs.metric.speculativeInsufficientReplicas.inc();
@@ -304,6 +299,11 @@ public abstract class AbstractReadExecutor
}
}
+ // we must update the plan to include this new node, else when we come to read-repair, we may not include this
+ // speculated response in the data requests we make again, and we will not be able to 'speculate' an extra repair read,
+ // nor would we be able to speculate a new 'write' if the repair writes are insufficient
+ super.replicaPlan.addToContacts(extraReplica);
+
if (traceState != null)
traceState.trace("speculating read retry on {}", extraReplica);
logger.trace("speculating read retry on {}", extraReplica);
@@ -325,10 +325,12 @@ public abstract class AbstractReadExecutor
{
public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
ReadCommand command,
- ReplicaLayout.ForToken replicaLayout,
+ ReplicaPlan.ForTokenRead replicaPlan,
long queryStartNanoTime)
{
- super(cfs, command, replicaLayout, replicaLayout.selected().size() > 1 ? 2 : 1, queryStartNanoTime);
+ // presumably, we speculate an extra data request here in case it is our data request that fails to respond,
+ // and there are no more nodes to consult
+ super(cfs, command, replicaPlan, replicaPlan.contacts().size() > 1 ? 2 : 1, queryStartNanoTime);
}
public void maybeTryAdditionalReplicas()
@@ -403,7 +405,7 @@ public abstract class AbstractReadExecutor
logger.trace("Timed out waiting on digest mismatch repair requests");
// the caught exception here will have CL.ALL from the repair command,
// not whatever CL the initial command was at (CASSANDRA-7947)
- throw new ReadTimeoutException(replicaLayout().consistencyLevel(), handler.blockfor - 1, handler.blockfor, true);
+ throw new ReadTimeoutException(replicaPlan().consistencyLevel(), handler.blockFor - 1, handler.blockFor, true);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index a6901b2..db5f3c8 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.db.transform.Filter;
import org.apache.cassandra.db.transform.FilteredPartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.reads.repair.ReadRepair;
@@ -52,14 +52,14 @@ import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
import static com.google.common.collect.Iterables.*;
import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener;
-public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
+public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
{
private final boolean enforceStrictLiveness;
- private final ReadRepair<E, L> readRepair;
+ private final ReadRepair<E, P> readRepair;
- public DataResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
+ public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
{
- super(command, replicaLayout, queryStartNanoTime);
+ super(command, replicaPlan, queryStartNanoTime);
this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
this.readRepair = readRepair;
}
@@ -83,7 +83,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
Collection<MessageIn<ReadResponse>> messages = responses.snapshot();
assert !any(messages, msg -> msg.payload.isDigestResponse());
- E replicas = replicaLayout.all().keep(transform(messages, msg -> msg.from));
+ E replicas = replicaPlan().candidates().select(transform(messages, msg -> msg.from), false);
List<UnfilteredPartitionIterator> iters = new ArrayList<>(
Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
assert replicas.size() == iters.size();
@@ -121,7 +121,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
- replicaLayout.withSelected(replicas),
+ replicaPlan.getWithContacts(replicas),
mergedResultCounter,
repairedDataTracker);
FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
@@ -135,7 +135,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
}
private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
- L sources,
+ P sources,
DataLimits.Counter mergedResultCounter,
RepairedDataTracker repairedDataTracker)
{
@@ -150,7 +150,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
*/
if (!command.limits().isUnlimited())
for (int i = 0; i < results.size(); i++)
- results.set(i, ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
+ results.set(i, ShortReadProtection.extend(sources.contacts().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker));
}
@@ -161,7 +161,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
}
private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
- L sources,
+ P sources,
RepairedDataTracker repairedDataTracker)
{
// Avoid wrapping no-op listeners as it doesn't throw
@@ -191,7 +191,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
table,
mergedDeletion == null ? "null" : mergedDeletion.toString(),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
- sources.selected(),
+ sources.contacts(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
@@ -212,7 +212,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
table,
merged == null ? "null" : merged.toString(table),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
- sources.selected(),
+ sources.contacts(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
@@ -238,7 +238,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
table,
merged == null ? "null" : merged.toString(table),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
- sources.selected(),
+ sources.contacts(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index 28c2117..0dcae95 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -29,10 +29,10 @@ import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.service.reads.repair.ReadRepair;
@@ -40,13 +40,13 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import static com.google.common.collect.Iterables.any;
-public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
+public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
{
private volatile MessageIn<ReadResponse> dataResponse;
- public DigestResolver(ReadCommand command, L replicas, long queryStartNanoTime)
+ public DigestResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- super(command, replicas, queryStartNanoTime);
+ super(command, replicaPlan, queryStartNanoTime);
Preconditions.checkArgument(command instanceof SinglePartitionReadCommand,
"DigestResolver can only be used with SinglePartitionReadCommand commands");
}
@@ -55,7 +55,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
public void preprocess(MessageIn<ReadResponse> message)
{
super.preprocess(message);
- Replica replica = replicaLayout.getReplicaFor(message.from);
+ Replica replica = replicaPlan().getReplicaFor(message.from);
if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull())
{
dataResponse = message;
@@ -76,7 +76,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
{
return any(responses,
msg -> !msg.payload.isDigestResponse()
- && replicaLayout.getReplicaFor(msg.from).isTransient());
+ && replicaPlan().getReplicaFor(msg.from).isTransient());
}
public PartitionIterator getData()
@@ -93,16 +93,14 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
{
// This path can be triggered only if we've got responses from full replicas and they match, but
// transient replica response still contains data, which needs to be reconciled.
- DataResolver<E, L> dataResolver = new DataResolver<>(command,
- replicaLayout,
- NoopReadRepair.instance,
- queryStartNanoTime);
+ DataResolver<E, P> dataResolver
+ = new DataResolver<>(command, replicaPlan, NoopReadRepair.instance, queryStartNanoTime);
dataResolver.preprocess(dataResponse);
// Reconcile with transient replicas
for (MessageIn<ReadResponse> response : responses)
{
- Replica replica = replicaLayout.getReplicaFor(response.from);
+ Replica replica = replicaPlan().getReplicaFor(response.from);
if (replica.isTransient())
dataResolver.preprocess(response);
}
@@ -119,7 +117,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
ByteBuffer digest = null;
for (MessageIn<ReadResponse> message : responses.snapshot())
{
- if (replicaLayout.getReplicaFor(message.from).isTransient())
+ if (replicaPlan().getReplicaFor(message.from).isTransient())
continue;
ByteBuffer newDigest = message.payload.digest(command);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 3d39377..7a2385c 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -23,19 +23,18 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
@@ -44,16 +43,17 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
-public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements IAsyncCallbackWithFailure<ReadResponse>
+public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements IAsyncCallbackWithFailure<ReadResponse>
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
public final ResponseResolver resolver;
final SimpleCondition condition = new SimpleCondition();
private final long queryStartNanoTime;
- // TODO: move to replica layout as well?
- final int blockfor;
- final L replicaLayout;
+ final int blockFor; // TODO: move to replica plan as well?
+ // this uses a plain reference, but is initialised before handoff to any other threads; the later updates
+ // may not be visible to the threads immediately, but ReplicaPlan only contains final fields, so they will never see an uninitialised object
+ final ReplicaPlan.Shared<E, P> replicaPlan;
private final ReadCommand command;
private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
= AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
@@ -63,19 +63,24 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
private volatile int failures = 0;
private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
- public ReadCallback(ResponseResolver resolver, int blockfor, ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ public ReadCallback(ResponseResolver resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
this.command = command;
- this.blockfor = blockfor;
this.resolver = resolver;
this.queryStartNanoTime = queryStartNanoTime;
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = replicaPlan;
+ this.blockFor = replicaPlan.get().blockFor();
this.failureReasonByEndpoint = new ConcurrentHashMap<>();
// we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
- assert !(command instanceof PartitionRangeReadCommand) || blockfor >= replicaLayout.selected().size();
+ assert !(command instanceof PartitionRangeReadCommand) || blockFor >= replicaPlan().contacts().size();
if (logger.isTraceEnabled())
- logger.trace("Blockfor is {}; setting up requests to {}", blockfor, this.replicaLayout);
+ logger.trace("Blockfor is {}; setting up requests to {}", blockFor, this.replicaPlan);
+ }
+
+ protected P replicaPlan()
+ {
+ return replicaPlan.get();
}
public boolean await(long timePastStart, TimeUnit unit)
@@ -94,30 +99,30 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
public void awaitResults() throws ReadFailureException, ReadTimeoutException
{
boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
- boolean failed = failures > 0 && blockfor + failures > replicaLayout.selected().size();
+ boolean failed = failures > 0 && blockFor + failures > replicaPlan().contacts().size();
if (signaled && !failed)
return;
if (Tracing.isTracing())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
- Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
+ Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
}
else if (logger.isDebugEnabled())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
- logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
+ logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
}
// Same as for writes, see AbstractWriteResponseHandler
throw failed
- ? new ReadFailureException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
- : new ReadTimeoutException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent());
+ ? new ReadFailureException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint)
+ : new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent());
}
public int blockFor()
{
- return blockfor;
+ return blockFor;
}
public void response(MessageIn<ReadResponse> message)
@@ -127,24 +132,16 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
? recievedUpdater.incrementAndGet(this)
: received;
- if (n >= blockfor && resolver.isDataPresent())
+ if (n >= blockFor && resolver.isDataPresent())
condition.signalAll();
}
/**
- * @return true if the message counts towards the blockfor threshold
+ * @return true if the message counts towards the blockFor threshold
*/
private boolean waitingFor(InetAddressAndPort from)
{
- return !replicaLayout.consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
- }
-
- /**
- * @return the current number of received responses
- */
- public int getReceivedCount()
- {
- return received;
+ return !replicaPlan().consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
}
public void response(ReadResponse result)
@@ -157,11 +154,6 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
response(message);
}
- public void assureSufficientLiveNodes() throws UnavailableException
- {
- replicaLayout.consistencyLevel().assureSufficientLiveNodesForRead(replicaLayout.keyspace(), replicaLayout.selected());
- }
-
public boolean isLatencyForSnitch()
{
return true;
@@ -176,7 +168,7 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
failureReasonByEndpoint.put(from, failureReason);
- if (blockfor + n > replicaLayout.selected().size())
+ if (blockFor + n > replicaPlan().contacts().size())
condition.signalAll();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index 298f843..0c1e1ba 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -23,30 +23,34 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.concurrent.Accumulator;
-public abstract class ResponseResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+public abstract class ResponseResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
{
protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class);
protected final ReadCommand command;
- protected final L replicaLayout;
+ protected final ReplicaPlan.Shared<E, P> replicaPlan;
// Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
protected final Accumulator<MessageIn<ReadResponse>> responses;
protected final long queryStartNanoTime;
- public ResponseResolver(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ public ResponseResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
this.command = command;
- this.replicaLayout = replicaLayout;
- // TODO: calculate max possible replicas for the query (e.g. local dc queries won't contact remotes)
- this.responses = new Accumulator<>(replicaLayout.all().size());
+ this.replicaPlan = replicaPlan;
+ this.responses = new Accumulator<>(replicaPlan.get().candidates().size());
this.queryStartNanoTime = queryStartNanoTime;
}
+ protected P replicaPlan()
+ {
+ return replicaPlan.get();
+ }
+
public abstract boolean isDataPresent();
public void preprocess(MessageIn<ReadResponse> message)
@@ -57,8 +61,8 @@ public abstract class ResponseResolver<E extends Endpoints<E>, L extends Replica
}
catch (IllegalStateException e)
{
- logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica layout: {}",
- message, command, replicaLayout);
+ logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica plan: {}",
+ message, command, replicaPlan);
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index 580b790..b16d105 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -19,6 +19,8 @@
package org.apache.cassandra.service.reads;
import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +42,6 @@ import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.ExcludingBounds;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
@@ -87,10 +88,11 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
* If we don't apply the transformation *after* extending the partition with MoreRows,
* applyToRow() method of protection will not be called on the first row of the new extension iterator.
*/
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
+ ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
+ ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(),
command, source,
- (cmd) -> executeReadCommand(cmd, replicaLayout),
+ (cmd) -> executeReadCommand(cmd, sharedReplicaPlan),
singleResultCounter,
mergedResultCounter);
return Transformation.apply(MoreRows.extend(partition, protection), protection);
@@ -169,14 +171,15 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
: new ExcludingBounds<>(lastPartitionKey, bounds.right);
DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
- ReplicaLayout.ForRange replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source);
- return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), replicaLayout);
+ ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source);
+ return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan));
}
- private <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, L replicaLayout)
+ private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan)
{
- DataResolver<E, L> resolver = new DataResolver<>(cmd, replicaLayout, (NoopReadRepair<E, L>)NoopReadRepair.instance, queryStartNanoTime);
- ReadCallback<E, L> handler = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), cmd, replicaLayout, queryStartNanoTime);
+ DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
+ ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime);
if (source.isLocal())
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 528d31b..1b213ff 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -24,6 +24,7 @@ import java.util.function.Consumer;
import com.google.common.base.Preconditions;
import com.codahale.metrics.Meter;
+import com.google.common.base.Predicates;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
@@ -34,7 +35,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -44,11 +45,12 @@ import org.apache.cassandra.service.reads.DigestResolver;
import org.apache.cassandra.service.reads.ReadCallback;
import org.apache.cassandra.tracing.Tracing;
-public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
+public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ implements ReadRepair<E, P>
{
protected final ReadCommand command;
protected final long queryStartNanoTime;
- protected final L replicaLayout;
+ protected final ReplicaPlan.Shared<E, P> replicaPlan;
protected final ColumnFamilyStore cfs;
private volatile DigestRepair digestRepair = null;
@@ -68,15 +70,20 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
}
public AbstractReadRepair(ReadCommand command,
- L replicaLayout,
+ ReplicaPlan.Shared<E, P> replicaPlan,
long queryStartNanoTime)
{
this.command = command;
this.queryStartNanoTime = queryStartNanoTime;
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = replicaPlan;
this.cfs = Keyspace.openAndGetStore(command.metadata());
}
+ protected P replicaPlan()
+ {
+ return replicaPlan.get();
+ }
+
void sendReadCommand(Replica to, ReadCallback readCallback)
{
MessageOut<ReadCommand> message = command.createMessage();
@@ -90,14 +97,13 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
abstract Meter getRepairMeter();
// digestResolver isn't used here because we resend read requests to all participants
- public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
+ public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer)
{
getRepairMeter().mark();
// Do a full data read to resolve the correct response (and repair node that need be)
- DataResolver<E, L> resolver = new DataResolver<>(command, replicaLayout, this, queryStartNanoTime);
- ReadCallback<E, L> readCallback = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(cfs.keyspace),
- command, replicaLayout, queryStartNanoTime);
+ DataResolver<E, P> resolver = new DataResolver<>(command, replicaPlan, this, queryStartNanoTime);
+ ReadCallback<E, P> readCallback = new ReadCallback<>(resolver, command, replicaPlan, queryStartNanoTime);
digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
@@ -105,12 +111,12 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled())
command.trackRepairedStatus();
- for (Replica replica : replicaLayout.selected())
+ for (Replica replica : replicaPlan().contacts())
{
Tracing.trace("Enqueuing full data read to {}", replica);
sendReadCommand(replica, readCallback);
}
- ReadRepairDiagnostics.startRepair(this, replicaLayout, digestResolver);
+ ReadRepairDiagnostics.startRepair(this, replicaPlan(), digestResolver);
}
public void awaitReads() throws ReadTimeoutException
@@ -125,7 +131,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
private boolean shouldSpeculate()
{
- ConsistencyLevel consistency = replicaLayout.consistencyLevel();
+ ConsistencyLevel consistency = replicaPlan().consistencyLevel();
ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
return consistency != ConsistencyLevel.EACH_QUORUM
&& consistency.satisfies(speculativeCL, cfs.keyspace)
@@ -142,15 +148,15 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS))
{
- L uncontacted = replicaLayout.forNaturalUncontacted();
- if (uncontacted.selected().isEmpty())
+ Replica uncontacted = replicaPlan().firstUncontactedCandidate(Predicates.alwaysTrue());
+ if (uncontacted == null)
return;
- Replica replica = uncontacted.selected().iterator().next();
- Tracing.trace("Enqueuing speculative full data read to {}", replica);
- sendReadCommand(replica, repair.readCallback);
+ replicaPlan.addToContacts(uncontacted);
+ Tracing.trace("Enqueuing speculative full data read to {}", uncontacted);
+ sendReadCommand(uncontacted, repair.readCallback);
ReadRepairMetrics.speculatedRead.mark();
- ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted);
+ ReadRepairDiagnostics.speculatedRead(this, uncontacted.endpoint(), replicaPlan());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 54af2cf..f536ea8 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -37,9 +37,9 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.IAsyncCallback;
@@ -49,25 +49,26 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.tracing.Tracing;
-public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractFuture<Object> implements IAsyncCallback<Object>
+public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends AbstractFuture<Object> implements IAsyncCallback<Object>
{
private final DecoratedKey key;
- private final L replicaLayout;
+ private final P replicaPlan;
private final Map<Replica, Mutation> pendingRepairs;
private final CountDownLatch latch;
private volatile long mutationsSentTime;
- public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
+ public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
{
this.key = key;
this.pendingRepairs = new ConcurrentHashMap<>(repairs);
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = replicaPlan;
// here we remove empty repair mutations from the block for total, since
// we're not sending them mutations
int blockFor = maxBlockFor;
- for (Replica participant: replicaLayout.selected())
+ for (Replica participant: replicaPlan.contacts())
{
// remote dcs can sometimes get involved in dc-local reads. We want to repair
// them if they do, but they shouldn't interfere with blocking the client read.
@@ -97,7 +98,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
private boolean shouldBlockOn(InetAddressAndPort endpoint)
{
- return !replicaLayout.consistencyLevel().isDatacenterLocal() || isLocal(endpoint);
+ return !replicaPlan.consistencyLevel().isDatacenterLocal() || isLocal(endpoint);
}
@VisibleForTesting
@@ -105,7 +106,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
{
if (shouldBlockOn(from))
{
- pendingRepairs.remove(replicaLayout.getReplicaFor(from));
+ pendingRepairs.remove(replicaPlan.getReplicaFor(from));
latch.countDown();
}
}
@@ -198,8 +199,8 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
if (awaitRepairs(timeout, timeoutUnit))
return;
- L newCandidates = replicaLayout.forNaturalUncontacted();
- if (newCandidates.selected().isEmpty())
+ E newCandidates = replicaPlan.uncontactedCandidates();
+ if (newCandidates.isEmpty())
return;
PartitionUpdate update = mergeUnackedUpdates();
@@ -212,7 +213,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
Mutation[] versionedMutations = new Mutation[msgVersionIdx(MessagingService.current_version) + 1];
- for (Replica replica : newCandidates.selected())
+ for (Replica replica : newCandidates)
{
int versionIdx = msgVersionIdx(MessagingService.instance().getVersion(replica.endpoint()));
@@ -220,7 +221,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
if (mutation == null)
{
- mutation = BlockingReadRepairs.createRepairMutation(update, replicaLayout.consistencyLevel(), replica.endpoint(), true);
+ mutation = BlockingReadRepairs.createRepairMutation(update, replicaPlan.consistencyLevel(), replica.endpoint(), true);
versionedMutations[versionIdx] = mutation;
}
@@ -239,7 +240,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
Keyspace getKeyspace()
{
- return replicaLayout.keyspace();
+ return replicaPlan.keyspace();
}
DecoratedKey getKey()
@@ -249,6 +250,6 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
ConsistencyLevel getConsistency()
{
- return replicaLayout.consistencyLevel();
+ return replicaPlan.consistencyLevel();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 402aed0..938abaf 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.locator.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +33,9 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.tracing.Tracing;
@@ -44,22 +44,23 @@ import org.apache.cassandra.tracing.Tracing;
* updates have been written to nodes needing correction. Breaks write
* atomicity in some situations
*/
-public class BlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
+public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends AbstractReadRepair<E, P>
{
private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
private final int blockFor;
- BlockingReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- super(command, replicaLayout, queryStartNanoTime);
- this.blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
+ super(command, replicaPlan, queryStartNanoTime);
+ this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
}
- public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout)
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
{
- return new PartitionIteratorMergeListener(replicaLayout, command, this.replicaLayout.consistencyLevel(), this);
+ return new PartitionIteratorMergeListener<>(replicaPlan, command, this);
}
@Override
@@ -91,20 +92,20 @@ public class BlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
if (timedOut)
{
// We got all responses, but timed out while repairing
- int blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
+ int blockFor = replicaPlan().blockFor();
if (Tracing.isTracing())
Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
else
logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
- throw new ReadTimeoutException(replicaLayout.consistencyLevel(), blockFor - 1, blockFor, true);
+ throw new ReadTimeoutException(replicaPlan().consistencyLevel(), blockFor - 1, blockFor, true);
}
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
{
- BlockingPartitionRepair<E, L> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaLayout);
+ BlockingPartitionRepair<E, P> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaPlan);
blockingRepair.sendInitialRepairs();
repairs.add(blockingRepair);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index 4af4a92..6aa6ece 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -26,28 +26,29 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.reads.DigestResolver;
/**
* Bypasses the read repair path for short read protection and testing
*/
-public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
+public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ implements ReadRepair<E, P>
{
public static final NoopReadRepair instance = new NoopReadRepair();
private NoopReadRepair() {}
@Override
- public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicas)
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicas)
{
return UnfilteredPartitionIterators.MergeListener.NOOP;
}
@Override
- public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
+ public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer)
{
resultConsumer.accept(digestResolver.getData());
}
@@ -75,7 +76,7 @@ public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index 4cae3ae..7247704 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -28,26 +28,26 @@ import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaPlan;
-public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener
+public class PartitionIteratorMergeListener<E extends Endpoints<E>>
+ implements UnfilteredPartitionIterators.MergeListener
{
- private final ReplicaLayout replicaLayout;
+ private final ReplicaPlan.ForRead<E> replicaPlan;
private final ReadCommand command;
- private final ConsistencyLevel consistency;
private final ReadRepair readRepair;
- public PartitionIteratorMergeListener(ReplicaLayout replicaLayout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+ public PartitionIteratorMergeListener(ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair)
{
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = replicaPlan;
this.command = command;
- this.consistency = consistency;
this.readRepair = readRepair;
}
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
{
- return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), replicaLayout, command, consistency, readRepair);
+ return new RowIteratorMergeListener<>(partitionKey, columns(versions), isReversed(versions), replicaPlan, command, readRepair);
}
protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
index c13e2d6..64bfec2 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
@@ -27,22 +27,23 @@ import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ReadRepairMetrics;
/**
* Only performs the collection of data responses and reconciliation of them, doesn't send repair mutations
* to replicas. This preserves write atomicity, but doesn't provide monotonic quorum reads
*/
-public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
+public class ReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends AbstractReadRepair<E, P>
{
- ReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ ReadOnlyReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- super(command, replicaLayout, queryStartNanoTime);
+ super(command, replicaPlan, queryStartNanoTime);
}
@Override
- public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout)
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
{
return UnfilteredPartitionIterators.MergeListener.NOOP;
}
@@ -60,7 +61,7 @@ public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
{
throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 168f003..9441945 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service.reads.repair;
import java.util.Map;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.locator.Endpoints;
@@ -29,17 +30,19 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.reads.DigestResolver;
-public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+public interface ReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
{
public interface Factory
{
- <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime);
+ <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime);
}
- static <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaPlan, long queryStartNanoTime)
+ static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
return command.metadata().params.readRepair.create(command, replicaPlan, queryStartNanoTime);
}
@@ -47,7 +50,7 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
/**
* Used by DataResolver to generate corrections as the partition iterator is consumed
*/
- UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout);
+ UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan);
/**
* Called when the digests from the initial read don't match. Reads may block on the
@@ -55,7 +58,7 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
* @param digestResolver supplied so we can get the original data response
* @param resultConsumer hook for the repair to set it's result on completion
*/
- public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer);
+ public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer);
/**
* Block on the reads (or timeout) sent out in {@link ReadRepair#startRepair}
@@ -90,5 +93,5 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
* Repairs a partition _after_ receiving data responses. This method receives replica list, since
* we will block repair only on the replicas that have responded.
*/
- void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout);
+ void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
index 4c74a89..b9167bd 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.diag.DiagnosticEventService;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.reads.DigestResolver;
import org.apache.cassandra.service.reads.repair.PartitionRepairEvent.PartitionRepairEventType;
import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType;
@@ -38,22 +39,22 @@ final class ReadRepairDiagnostics
{
}
- static void startRepair(AbstractReadRepair readRepair, ReplicaLayout<?, ?> layout, DigestResolver digestResolver)
+ static void startRepair(AbstractReadRepair readRepair, ReplicaPlan.ForRead<?> fullPlan, DigestResolver digestResolver)
{
if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR))
service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR,
readRepair,
- layout.selected().endpoints(),
- layout.all().endpoints(), digestResolver));
+ fullPlan.contacts().endpoints(),
+ fullPlan.candidates().endpoints(), digestResolver));
}
static void speculatedRead(AbstractReadRepair readRepair, InetAddressAndPort endpoint,
- ReplicaLayout<?, ?> replicaLayout)
+ ReplicaPlan.ForRead<?> fullPlan)
{
if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.SPECULATED_READ))
service.publish(new ReadRepairEvent(ReadRepairEventType.SPECULATED_READ,
readRepair, Collections.singletonList(endpoint),
- Lists.newArrayList(replicaLayout.all().endpoints()), null));
+ Lists.newArrayList(fullPlan.candidates().endpoints()), null));
}
static void sendInitialRepair(BlockingPartitionRepair partitionRepair, InetAddressAndPort destination, Mutation mutation)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
index 9e14362..5cec802 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
@@ -67,7 +67,7 @@ final class ReadRepairEvent extends DiagnosticEvent
this.keyspace = readRepair.cfs.keyspace;
this.tableName = readRepair.cfs.getTableName();
this.cqlCommand = readRepair.command.toCQLString();
- this.consistency = readRepair.replicaLayout.consistencyLevel();
+ this.consistency = readRepair.replicaPlan().consistencyLevel();
this.speculativeRetry = readRepair.cfs.metadata().params.speculativeRetry.kind();
this.destinations = destinations;
this.allEndpoints = allEndpoints;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
index 28c0e9e..7a4b795 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
@@ -21,22 +21,25 @@ package org.apache.cassandra.service.reads.repair;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
public enum ReadRepairStrategy implements ReadRepair.Factory
{
NONE
{
- public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ public <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- return new ReadOnlyReadRepair<>(command, replicaLayout, queryStartNanoTime);
+ return new ReadOnlyReadRepair<>(command, replicaPlan, queryStartNanoTime);
}
},
BLOCKING
{
- public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ public <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- return new BlockingReadRepair<>(command, replicaLayout, queryStartNanoTime);
+ return new BlockingReadRepair<>(command, replicaPlan, queryStartNanoTime);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 7fe797a..60e0d41 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -46,21 +46,21 @@ import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.schema.ColumnMetadata;
-public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener
+public class RowIteratorMergeListener<E extends Endpoints<E>>
+ implements UnfilteredRowIterators.MergeListener
{
private final DecoratedKey partitionKey;
private final RegularAndStaticColumns columns;
private final boolean isReversed;
private final ReadCommand command;
- private final ConsistencyLevel consistency;
private final PartitionUpdate.Builder[] repairs;
private final Row.Builder[] currentRows;
private final RowDiffListener diffListener;
- private final ReplicaLayout layout;
+ private final ReplicaPlan.ForRead<E> replicaPlan;
// The partition level deletion for the merge row.
private DeletionTime partitionLevelDeletion;
@@ -73,19 +73,18 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
private final ReadRepair readRepair;
- public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaLayout layout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+ public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair)
{
this.partitionKey = partitionKey;
this.columns = columns;
this.isReversed = isReversed;
- this.layout = layout;
- int size = layout.selected().size();
+ this.replicaPlan = replicaPlan;
+ int size = replicaPlan.contacts().size();
repairs = new PartitionUpdate.Builder[size];
currentRows = new Row.Builder[size];
sourceDeletionTime = new DeletionTime[size];
markerToRepair = new ClusteringBound[size];
this.command = command;
- this.consistency = consistency;
this.readRepair = readRepair;
this.diffListener = new RowDiffListener()
@@ -310,7 +309,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
public void close()
{
Map<Replica, Mutation> mutations = null;
- Endpoints<?> sources = layout.selected();
+ Endpoints<?> sources = replicaPlan.contacts();
for (int i = 0; i < repairs.length; i++)
{
if (repairs[i] == null)
@@ -318,7 +317,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
Replica source = sources.get(i);
- Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, source.endpoint(), false);
+ Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), replicaPlan.consistencyLevel(), source.endpoint(), false);
if (mutation == null)
continue;
@@ -330,7 +329,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
if (mutations != null)
{
- readRepair.repairPartition(partitionKey, mutations, layout);
+ readRepair.repairPartition(partitionKey, mutations, replicaPlan);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
index 66eff23..f937f96 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -22,7 +22,6 @@ import com.google.common.base.Predicates;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -34,13 +33,14 @@ import org.junit.Assert;
import org.junit.Test;
import java.net.UnknownHostException;
-import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static com.google.common.collect.Iterables.*;
+import static com.google.common.collect.Iterables.filter;
import static org.apache.cassandra.locator.Replica.fullReplica;
import static org.apache.cassandra.locator.Replica.transientReplica;
@@ -111,7 +111,7 @@ public class ReplicaCollectionTest
void testEquals()
{
- Assert.assertTrue(Iterables.elementsEqual(canonicalList, test));
+ Assert.assertTrue(elementsEqual(canonicalList, test));
}
void testEndpoints()
@@ -144,28 +144,6 @@ public class ReplicaCollectionTest
Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints());
}
- void testSelect(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
- {
- TestCase<C> allMatchZeroCapacity = new TestCase<>(test.select().add(Predicates.alwaysTrue(), 0).get(), Collections.emptyList());
- allMatchZeroCapacity.testAll(subListDepth, filterDepth, sortDepth, selectDepth - 1);
-
- TestCase<C> noMatchFullCapacity = new TestCase<>(test.select().add(Predicates.alwaysFalse(), canonicalList.size()).get(), Collections.emptyList());
- noMatchFullCapacity.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1);
-
- if (canonicalList.size() <= 2)
- return;
-
- List<Replica> newOrderList = ImmutableList.of(canonicalList.get(2), canonicalList.get(1), canonicalList.get(0));
- TestCase<C> newOrder = new TestCase<>(
- test.select()
- .add(r -> r == newOrderList.get(0), 3)
- .add(r -> r == newOrderList.get(1), 3)
- .add(r -> r == newOrderList.get(2), 3)
- .get(), newOrderList
- );
- newOrder.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1);
- }
-
private void assertSubList(C subCollection, int from, int to)
{
Assert.assertTrue(subCollection.isSnapshot);
@@ -182,7 +160,7 @@ public class ReplicaCollectionTest
}
}
- void testSubList(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ void testSubList(int subListDepth, int filterDepth, int sortDepth)
{
if (test.isSnapshot)
Assert.assertSame(test, test.subList(0, test.size()));
@@ -192,34 +170,62 @@ public class ReplicaCollectionTest
TestCase<C> skipFront = new TestCase<>(test.subList(1, test.size()), canonicalList.subList(1, canonicalList.size()));
assertSubList(skipFront.test, 1, canonicalList.size());
- skipFront.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth);
+ skipFront.testAll(subListDepth - 1, filterDepth, sortDepth);
TestCase<C> skipBack = new TestCase<>(test.subList(0, test.size() - 1), canonicalList.subList(0, canonicalList.size() - 1));
assertSubList(skipBack.test, 0, canonicalList.size() - 1);
- skipBack.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth);
+ skipBack.testAll(subListDepth - 1, filterDepth, sortDepth);
}
- void testFilter(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ void testFilter(int subListDepth, int filterDepth, int sortDepth)
{
if (test.isSnapshot)
Assert.assertSame(test, test.filter(Predicates.alwaysTrue()));
if (test.isEmpty())
return;
+
// remove start
// we recurse on the same subset in testSubList, so just corroborate we have the correct list here
- assertSubList(test.filter(r -> r != canonicalList.get(0)), 1, canonicalList.size());
+ {
+ Predicate<Replica> removeFirst = r -> r != canonicalList.get(0);
+ assertSubList(test.filter(removeFirst), 1, canonicalList.size());
+ assertSubList(test.filter(removeFirst, 1), 1, Math.min(canonicalList.size(), 2));
+ }
if (test.size() <= 1)
return;
+
// remove end
// we recurse on the same subset in testSubList, so just corroborate we have the correct list here
- assertSubList(test.filter(r -> r != canonicalList.get(canonicalList.size() - 1)), 0, canonicalList.size() - 1);
+ {
+ int last = canonicalList.size() - 1;
+ Predicate<Replica> removeLast = r -> r != canonicalList.get(last);
+ assertSubList(test.filter(removeLast), 0, last);
+ }
if (test.size() <= 2)
return;
+
Predicate<Replica> removeMiddle = r -> r != canonicalList.get(canonicalList.size() / 2);
- TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(Iterables.filter(canonicalList, removeMiddle::test)));
- filtered.testAll(subListDepth, filterDepth - 1, sortDepth, selectDepth);
+ TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(filter(canonicalList, removeMiddle::test)));
+ filtered.testAll(subListDepth, filterDepth - 1, sortDepth);
+ }
+
+ void testCount()
+ {
+ Assert.assertEquals(0, test.count(Predicates.alwaysFalse()));
+
+ if (test.isEmpty())
+ {
+ Assert.assertEquals(0, test.count(Predicates.alwaysTrue()));
+ return;
+ }
+
+ for (int i = 0 ; i < canonicalList.size() ; ++i)
+ {
+ Replica discount = canonicalList.get(i);
+ Assert.assertEquals(canonicalList.size() - 1, test.count(r -> r != discount));
+ }
}
void testContains()
@@ -235,7 +241,7 @@ public class ReplicaCollectionTest
Assert.assertEquals(canonicalList.get(i), test.get(i));
}
- void testSort(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ void testSort(int subListDepth, int filterDepth, int sortDepth)
{
final Comparator<Replica> comparator = (o1, o2) ->
{
@@ -244,10 +250,10 @@ public class ReplicaCollectionTest
return f1 == f2 ? 0 : f1 ? 1 : -1;
};
TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList));
- sorted.testAll(subListDepth, filterDepth, sortDepth - 1, selectDepth);
+ sorted.testAll(subListDepth, filterDepth, sortDepth - 1);
}
- private void testAll(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ private void testAll(int subListDepth, int filterDepth, int sortDepth)
{
testEndpoints();
testOrderOfIteration();
@@ -255,19 +261,18 @@ public class ReplicaCollectionTest
testGet();
testEquals();
testSize();
+ testCount();
if (subListDepth > 0)
- testSubList(subListDepth, filterDepth, sortDepth, selectDepth);
+ testSubList(subListDepth, filterDepth, sortDepth);
if (filterDepth > 0)
- testFilter(subListDepth, filterDepth, sortDepth, selectDepth);
+ testFilter(subListDepth, filterDepth, sortDepth);
if (sortDepth > 0)
- testSort(subListDepth, filterDepth, sortDepth, selectDepth);
- if (selectDepth > 0)
- testSelect(subListDepth, filterDepth, sortDepth, selectDepth);
+ testSort(subListDepth, filterDepth, sortDepth);
}
public void testAll()
{
- testAll(2, 2, 2, 2);
+ testAll(2, 2, 2);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org