You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/09/14 09:14:54 UTC
[4/4] cassandra git commit: ReplicaPlan/Layout refactor
follow-up/completion
ReplicaPlan/Layout refactor follow-up/completion
Finish much of the work to clarify endpoint selection
that was begun in Transient Replication (CASSANDRA-14404)
Also fixes:
- commitPaxos was incorrectly selecting only live nodes,
when needed to include down
- We were not writing to pending transient replicas
- On write, we were not hinting to full nodes with transient
replication
- rr.maybeSendAdditional{Reads,Writes} would only consult the
same node we may have speculated a read to
- transient->full movements mishandled consistency level upgrade by
retaining the 'full' pending variant, which increased CL requirement;
instead, the 'natural' replica is upgraded to 'full' for writes
patch by Benedict; reviewed by Alex Petrov and Ariel Weisberg for CASSANDRA-14705
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/047bcd7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/047bcd7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/047bcd7a
Branch: refs/heads/trunk
Commit: 047bcd7ad171d6a4aa89128c5e6c6ed5f012b1c0
Parents: 05dbb3e
Author: Benedict Elliott Smith <be...@apple.com>
Authored: Fri Sep 7 11:41:28 2018 +0100
Committer: Benedict Elliott Smith <be...@apple.com>
Committed: Fri Sep 14 10:14:37 2018 +0100
----------------------------------------------------------------------
.../cassandra/batchlog/BatchlogManager.java | 64 ++-
.../apache/cassandra/db/ConsistencyLevel.java | 26 +-
.../apache/cassandra/gms/FailureDetector.java | 4 +
.../apache/cassandra/hints/HintsService.java | 4 +-
.../locator/AbstractReplicaCollection.java | 67 +--
.../locator/AbstractReplicationStrategy.java | 20 +-
.../org/apache/cassandra/locator/Endpoints.java | 59 +--
.../cassandra/locator/EndpointsForRange.java | 2 +-
.../cassandra/locator/EndpointsForToken.java | 2 +-
.../cassandra/locator/RangesAtEndpoint.java | 2 +-
.../cassandra/locator/ReplicaCollection.java | 27 +-
.../apache/cassandra/locator/ReplicaLayout.java | 435 ++++++++-----------
.../apache/cassandra/locator/ReplicaPlan.java | 240 ++++++++++
.../apache/cassandra/locator/ReplicaPlans.java | 295 +++++++++++++
.../cassandra/locator/SystemReplicas.java | 12 +-
.../apache/cassandra/locator/TokenMetadata.java | 8 +-
.../apache/cassandra/net/IAsyncCallback.java | 10 -
.../service/AbstractWriteResponseHandler.java | 50 +--
.../cassandra/service/ActiveRepairService.java | 2 +-
.../service/BatchlogResponseHandler.java | 15 +-
.../DatacenterSyncWriteResponseHandler.java | 12 +-
.../service/DatacenterWriteResponseHandler.java | 10 +-
.../apache/cassandra/service/StorageProxy.java | 239 +++++-----
.../cassandra/service/StorageService.java | 82 +---
.../cassandra/service/WriteResponseHandler.java | 16 +-
.../service/reads/AbstractReadExecutor.java | 78 ++--
.../cassandra/service/reads/DataResolver.java | 26 +-
.../cassandra/service/reads/DigestResolver.java | 22 +-
.../cassandra/service/reads/ReadCallback.java | 62 ++-
.../service/reads/ResponseResolver.java | 22 +-
.../reads/ShortReadPartitionsProtection.java | 19 +-
.../reads/repair/AbstractReadRepair.java | 42 +-
.../reads/repair/BlockingPartitionRepair.java | 29 +-
.../reads/repair/BlockingReadRepair.java | 25 +-
.../service/reads/repair/NoopReadRepair.java | 11 +-
.../repair/PartitionIteratorMergeListener.java | 16 +-
.../reads/repair/ReadOnlyReadRepair.java | 13 +-
.../service/reads/repair/ReadRepair.java | 17 +-
.../reads/repair/ReadRepairDiagnostics.java | 11 +-
.../service/reads/repair/ReadRepairEvent.java | 2 +-
.../reads/repair/ReadRepairStrategy.java | 11 +-
.../reads/repair/RowIteratorMergeListener.java | 21 +-
.../locator/ReplicaCollectionTest.java | 89 ++--
.../service/WriteResponseHandlerTest.java | 4 +-
.../WriteResponseHandlerTransientTest.java | 71 ++-
.../service/reads/DataResolverTest.java | 21 +-
.../reads/DataResolverTransientTest.java | 227 ++++++++++
.../service/reads/DigestResolverTest.java | 5 +-
.../service/reads/ReadExecutorTest.java | 7 +-
.../reads/repair/AbstractReadRepairTest.java | 32 +-
.../reads/repair/BlockingReadRepairTest.java | 36 +-
.../DiagEventsBlockingReadRepairTest.java | 26 +-
.../reads/repair/InstrumentedReadRepair.java | 4 +-
.../reads/repair/ReadOnlyReadRepairTest.java | 24 +-
.../service/reads/repair/ReadRepairTest.java | 12 +-
.../reads/repair/TestableReadRepair.java | 18 +-
56 files changed, 1655 insertions(+), 1051 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 8dda54e..77f725c 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -26,12 +26,9 @@ import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
import com.google.common.collect.*;
import com.google.common.util.concurrent.RateLimiter;
-import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.EndpointsForToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +51,8 @@ import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -459,36 +458,34 @@ public class BatchlogManager implements BatchlogManagerMBean
Keyspace keyspace = Keyspace.open(ks);
Token tk = mutation.key().getToken();
- EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(ks, tk);
- Replicas.temporaryAssertFull(replicas); // TODO in CASSANDRA-14549
+ // TODO: this logic could do with revisiting at some point, as it is unclear what its rationale is
+ // we perform a local write, ignoring errors and inline in this thread (potentially slowing replay down)
+ // effectively bumping CL for locally owned writes and also potentially stalling log replay if an error occurs
+ // once we decide how it should work, it can also probably be simplified, and avoid constructing a ReplicaPlan directly
+ ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
+ Replicas.temporaryAssertFull(liveAndDown.all()); // TODO in CASSANDRA-14549
+
+ Replica selfReplica = liveAndDown.all().selfIfPresent();
+ if (selfReplica != null)
+ mutation.apply();
- EndpointsForToken.Builder liveReplicasBuilder = EndpointsForToken.builder(tk);
- for (Replica replica : replicas)
+ ReplicaLayout.ForTokenWrite liveRemoteOnly = liveAndDown.filter(
+ r -> FailureDetector.isReplicaAlive.test(r) && r != selfReplica);
+
+ for (Replica replica : liveAndDown.all())
{
- if (replica.isLocal())
- {
- mutation.apply();
- }
- else if (FailureDetector.instance.isAlive(replica.endpoint()))
- {
- liveReplicasBuilder.add(replica); // will try delivering directly instead of writing a hint.
- }
- else
- {
- hintedNodes.add(replica.endpoint());
- HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()),
- Hint.create(mutation, writtenAt));
- }
+ if (replica == selfReplica || liveRemoteOnly.all().contains(replica))
+ continue;
+ hintedNodes.add(replica.endpoint());
+ HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()),
+ Hint.create(mutation, writtenAt));
}
- EndpointsForToken liveReplicas = liveReplicasBuilder.build();
- if (liveReplicas.isEmpty())
- return null;
-
- Replicas.temporaryAssertFull(liveReplicas);
- ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(keyspace, liveReplicas, System.nanoTime());
+ ReplicaPlan.ForTokenWrite replicaPlan = new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE,
+ liveRemoteOnly.pending(), liveRemoteOnly.all(), liveRemoteOnly.all(), liveRemoteOnly.all());
+ ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(replicaPlan, System.nanoTime());
MessageOut<Mutation> message = mutation.createMessage();
- for (Replica replica : liveReplicas)
+ for (Replica replica : liveRemoteOnly.all())
MessagingService.instance().sendWriteRR(message, replica, handler, false);
return handler;
}
@@ -509,17 +506,16 @@ public class BatchlogManager implements BatchlogManagerMBean
{
private final Set<InetAddressAndPort> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
- ReplayWriteResponseHandler(Keyspace keyspace, EndpointsForToken writeReplicas, long queryStartNanoTime)
+ ReplayWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, long queryStartNanoTime)
{
- super(ReplicaLayout.forWriteWithDownNodes(keyspace, null, writeReplicas.token(), writeReplicas, EndpointsForToken.empty(writeReplicas.token())),
- null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
- Iterables.addAll(undelivered, writeReplicas.endpoints());
+ super(replicaPlan, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
+ Iterables.addAll(undelivered, replicaPlan.contacts().endpoints());
}
@Override
- protected int totalBlockFor()
+ protected int blockFor()
{
- return this.replicaLayout.selected().size();
+ return this.replicaPlan.contacts().size();
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 35ba198..5a4baf7 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -251,16 +251,10 @@ public enum ConsistencyLevel
if (this == EACH_QUORUM && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
return filterForEachQuorum(keyspace, liveReplicas);
- /*
- * Endpoints are expected to be restricted to live replicas, sorted by snitch preference.
- * For LOCAL_QUORUM, move local-DC replicas in front first as we need them there whether
- * we do read repair (since the first replica gets the data read) or not (since we'll take
- * the blockFor first ones).
- */
- if (isDCLocal)
- liveReplicas = liveReplicas.sorted(DatabaseDescriptor.getLocalComparator());
-
- return liveReplicas.subList(0, Math.min(liveReplicas.size(), blockFor(keyspace) + (alwaysSpeculate ? 1 : 0)));
+ int count = blockFor(keyspace) + (alwaysSpeculate ? 1 : 0);
+ return isDCLocal
+ ? liveReplicas.filter(ConsistencyLevel::isLocal, count)
+ : liveReplicas.subList(0, Math.min(liveReplicas.size(), count));
}
private <E extends Endpoints<E>> E filterForEachQuorum(Keyspace keyspace, E liveReplicas)
@@ -285,7 +279,7 @@ public enum ConsistencyLevel
});
}
- public boolean isSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas)
+ public boolean isSufficientLiveReplicasForRead(Keyspace keyspace, Endpoints<?> liveReplicas)
{
switch (this)
{
@@ -316,15 +310,15 @@ public enum ConsistencyLevel
}
}
- public void assureSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException
+ public void assureSufficientLiveReplicasForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException
{
- assureSufficientLiveNodes(keyspace, liveReplicas, blockFor(keyspace), 1);
+ assureSufficientLiveReplicas(keyspace, liveReplicas, blockFor(keyspace), 1);
}
- public void assureSufficientLiveNodesForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
+ public void assureSufficientLiveReplicasForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
{
- assureSufficientLiveNodes(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0);
+ assureSufficientLiveReplicas(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0);
}
- public void assureSufficientLiveNodes(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
+ void assureSufficientLiveReplicas(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
{
switch (this)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index e567b7b..d7f73ab 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -27,11 +27,13 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.*;
+import org.apache.cassandra.locator.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +73,8 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
}
public static final IFailureDetector instance = new FailureDetector();
+ public static final Predicate<InetAddressAndPort> isEndpointAlive = instance::isAlive;
+ public static final Predicate<Replica> isReplicaAlive = r -> isEndpointAlive.test(r.endpoint());
// this is useless except to provide backwards compatibility in phi_convict_threshold,
// because everyone seems pretty accustomed to the default of 8, and users who have
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index c6ad3d9..73840d3 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -34,6 +34,8 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -185,7 +187,7 @@ public final class HintsService implements HintsServiceMBean
String keyspaceName = hint.mutation.getKeyspaceName();
Token token = hint.mutation.key().getToken();
- EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token);
+ EndpointsForToken replicas = ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token).all();
// judicious use of streams: eagerly materializing probably cheaper
// than performing filters / translations 2x extra via Iterables.filter/transform
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
index 6a7a4ff..94ff991 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
@@ -75,7 +75,6 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
*/
public abstract Mutable<C> newMutable(int initialCapacity);
-
public C snapshot()
{
return isSnapshot ? self()
@@ -83,6 +82,7 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
: new ArrayList<>(list));
}
+ /** see {@link ReplicaCollection#subList(int, int)}*/
public final C subList(int start, int end)
{
List<Replica> subList;
@@ -100,11 +100,23 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
return snapshot(subList);
}
+ /** see {@link ReplicaCollection#count(Predicate)}*/
+ public int count(Predicate<Replica> predicate)
+ {
+ int count = 0;
+ for (int i = 0 ; i < list.size() ; ++i)
+ if (predicate.test(list.get(i)))
+ ++count;
+ return count;
+ }
+
+ /** see {@link ReplicaCollection#filter(Predicate)}*/
public final C filter(Predicate<Replica> predicate)
{
return filter(predicate, Integer.MAX_VALUE);
}
+ /** see {@link ReplicaCollection#filter(Predicate, int)}*/
public final C filter(Predicate<Replica> predicate, int limit)
{
if (isEmpty())
@@ -148,53 +160,8 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
return snapshot(copy);
}
- public final class Select
- {
- private final List<Replica> result;
- public Select(int expectedSize)
- {
- this.result = new ArrayList<>(expectedSize);
- }
-
- /**
- * Add matching replica to the result; this predicate should be mutually exclusive with all prior predicates.
- * Stop once we have targetSize replicas in total, including preceding calls
- */
- public Select add(Predicate<Replica> predicate, int targetSize)
- {
- assert !Iterables.any(result, predicate::test);
- for (int i = 0 ; result.size() < targetSize && i < list.size() ; ++i)
- if (predicate.test(list.get(i)))
- result.add(list.get(i));
- return this;
- }
- public Select add(Predicate<Replica> predicate)
- {
- return add(predicate, Integer.MAX_VALUE);
- }
- public C get()
- {
- return snapshot(result);
- }
- }
-
- /**
- * An efficient method for selecting a subset of replica via a sequence of filters.
- *
- * Example: select().add(filter1).add(filter2, 3).get();
- *
- * @return a Select object
- */
- public final Select select()
- {
- return select(list.size());
- }
- public final Select select(int expectedSize)
- {
- return new Select(expectedSize);
- }
-
- public final C sorted(Comparator<Replica> comparator)
+ /** see {@link ReplicaCollection#sorted(Comparator)}*/
+ public final C sorted(Comparator<Replica> comparator)
{
List<Replica> copy = new ArrayList<>(list);
copy.sort(comparator);
@@ -267,9 +234,9 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
if (replicas.isEmpty())
return extraReplicas;
Mutable<C> mutable = replicas.newMutable(replicas.size() + extraReplicas.size());
- mutable.addAll(replicas);
+ mutable.addAll(replicas, Mutable.Conflict.NONE);
mutable.addAll(extraReplicas, ignoreConflicts);
- return mutable.asImmutableView();
+ return mutable.asSnapshot();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index d168052..bad736f 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -141,33 +141,33 @@ public abstract class AbstractReplicationStrategy
*/
public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata);
- public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
- return getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
+ return getWriteResponseHandler(replicaPlan, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
}
- public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime,
ConsistencyLevel idealConsistencyLevel)
{
AbstractWriteResponseHandler resultResponseHandler;
- if (replicaLayout.consistencyLevel.isDatacenterLocal())
+ if (replicaPlan.consistencyLevel().isDatacenterLocal())
{
// block for in this context will be localnodes block.
- resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
+ resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
}
- else if (replicaLayout.consistencyLevel == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
+ else if (replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
{
- resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
+ resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
}
else
{
- resultResponseHandler = new WriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
+ resultResponseHandler = new WriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
}
//Check if tracking the ideal consistency level is configured
@@ -176,14 +176,14 @@ public abstract class AbstractReplicationStrategy
//If ideal and requested are the same just use this handler to track the ideal consistency level
//This is also used so that the ideal consistency level handler when constructed knows it is the ideal
//one for tracking purposes
- if (idealConsistencyLevel == replicaLayout.consistencyLevel)
+ if (idealConsistencyLevel == replicaPlan.consistencyLevel())
{
resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler);
}
else
{
//Construct a delegate response handler to use to track the ideal consistency level
- AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaLayout.withConsistencyLevel(idealConsistencyLevel),
+ AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaPlan.withConsistencyLevel(idealConsistencyLevel),
callback,
writeType,
queryStartNanoTime,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/Endpoints.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Endpoints.java b/src/java/org/apache/cassandra/locator/Endpoints.java
index 3d5faa4..28e578c 100644
--- a/src/java/org/apache/cassandra/locator/Endpoints.java
+++ b/src/java/org/apache/cassandra/locator/Endpoints.java
@@ -29,6 +29,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
+/**
+ * A collection of Endpoints for a given ring position. This will typically reside in a ReplicaLayout,
+ * representing some subset of the endpoints for the Token or Range
+ * @param <E> The concrete type of Endpoints, that will be returned by the modifying methods
+ */
public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaCollection<E>
{
static final Map<InetAddressAndPort, Replica> EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>());
@@ -89,17 +94,32 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC
return filter(r -> !self.equals(r.endpoint()));
}
+ public Replica selfIfPresent()
+ {
+ InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
+ return byEndpoint().get(self);
+ }
+
+ /**
+ * @return a collection without the provided endpoints, otherwise in the same order as this collection
+ */
public E without(Set<InetAddressAndPort> remove)
{
return filter(r -> !remove.contains(r.endpoint()));
}
+ /**
+ * @return a collection with only the provided endpoints (ignoring any not present), otherwise in the same order as this collection
+ */
public E keep(Set<InetAddressAndPort> keep)
{
return filter(r -> keep.contains(r.endpoint()));
}
- public E keep(Iterable<InetAddressAndPort> endpoints)
+ /**
+ * @return a collection containing the Replica from this collection for the provided endpoints, in the order of the provided endpoints
+ */
+ public E select(Iterable<InetAddressAndPort> endpoints, boolean ignoreMissing)
{
ReplicaCollection.Mutable<E> copy = newMutable(
endpoints instanceof Collection<?>
@@ -109,10 +129,14 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC
Map<InetAddressAndPort, Replica> byEndpoint = byEndpoint();
for (InetAddressAndPort endpoint : endpoints)
{
- Replica keep = byEndpoint.get(endpoint);
- if (keep == null)
+ Replica select = byEndpoint.get(endpoint);
+ if (select == null)
+ {
+ if (!ignoreMissing)
+ throw new IllegalArgumentException(endpoint + " is not present in " + this);
continue;
- copy.add(keep, ReplicaCollection.Mutable.Conflict.DUPLICATE);
+ }
+ copy.add(select, ReplicaCollection.Mutable.Conflict.DUPLICATE);
}
return copy.asSnapshot();
}
@@ -124,34 +148,19 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC
* 2) because a movement that changes the type of replication from transient to full must be handled
* differently for reads and writes (with the reader treating it as transient, and writer as full)
*
- * The method haveConflicts() below, and resolveConflictsInX, are used to detect and resolve any issues
+ * The method {@link ReplicaLayout#haveWriteConflicts} can be used to detect and resolve any issues
*/
public static <E extends Endpoints<E>> E concat(E natural, E pending)
{
return AbstractReplicaCollection.concat(natural, pending, Conflict.NONE);
}
- public static <E extends Endpoints<E>> boolean haveConflicts(E natural, E pending)
- {
- Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
- for (InetAddressAndPort pendingEndpoint : pending.endpoints())
- {
- if (naturalEndpoints.contains(pendingEndpoint))
- return true;
- }
- return false;
- }
-
- // must apply first
- public static <E extends Endpoints<E>> E resolveConflictsInNatural(E natural, E pending)
- {
- return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true));
- }
-
- // must apply second
- public static <E extends Endpoints<E>> E resolveConflictsInPending(E natural, E pending)
+ public static <E extends Endpoints<E>> E append(E replicas, Replica extraReplica)
{
- return pending.without(natural.endpoints());
+ Mutable<E> mutable = replicas.newMutable(replicas.size() + 1);
+ mutable.addAll(replicas);
+ mutable.add(extraReplica, Conflict.NONE);
+ return mutable.asSnapshot();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/EndpointsForRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForRange.java b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
index c2d8232..f812951 100644
--- a/src/java/org/apache/cassandra/locator/EndpointsForRange.java
+++ b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
@@ -86,7 +86,7 @@ public class EndpointsForRange extends Endpoints<EndpointsForRange>
{
boolean hasSnapshot;
public Mutable(Range<Token> range) { this(range, 0); }
- public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+ public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
public void add(Replica replica, Conflict ignoreConflict)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/EndpointsForToken.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForToken.java b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
index f24c615..3446dc9 100644
--- a/src/java/org/apache/cassandra/locator/EndpointsForToken.java
+++ b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
@@ -77,7 +77,7 @@ public class EndpointsForToken extends Endpoints<EndpointsForToken>
{
boolean hasSnapshot;
public Mutable(Token token) { this(token, 0); }
- public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+ public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
public void add(Replica replica, Conflict ignoreConflict)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index 74828ad..1773173 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -173,7 +173,7 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint
{
boolean hasSnapshot;
public Mutable(InetAddressAndPort endpoint) { this(endpoint, 0); }
- public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+ public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
public void add(Replica replica, Conflict ignoreConflict)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
index 6833f4b..d1006dc 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
@@ -62,6 +62,11 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera
public abstract boolean contains(Replica replica);
/**
+ * @return the number of replicas that match the predicate
+ */
+ public abstract int count(Predicate<Replica> predicate);
+
+ /**
* @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate.
* An effort will be made to either return ourself, or a subList, where possible.
* It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
@@ -108,16 +113,30 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera
C asImmutableView();
/**
- * @return an Immutable clone that assumes this Mutable will never be modified again.
- * If this is not true, behaviour is undefined.
+ * @return an Immutable clone that assumes this Mutable will never be modified again,
+ * so its contents can be reused.
+ *
+ * This Mutable should enforce that it is no longer modified.
*/
C asSnapshot();
- enum Conflict { NONE, DUPLICATE, ALL}
+ /**
+ * Passed to add() and addAll() as ignoreConflicts parameter. The meaning of conflict varies by collection type
+ * (for Endpoints, it is a duplicate InetAddressAndPort; for RangesAtEndpoint it is a duplicate Range).
+ */
+ enum Conflict
+ {
+ /** fail on addition of any such conflict */
+ NONE,
+ /** fail on addition of any such conflict where the contents differ (first occurrence and position wins) */
+ DUPLICATE,
+ /** ignore all conflicts (the first occurrence and position wins) */
+ ALL
+ }
/**
* @param replica add this replica to the end of the collection
- * @param ignoreConflict if false, fail on any conflicting additions (as defined by C's semantics)
+ * @param ignoreConflict conflicts to ignore, see {@link Conflict}
*/
void add(Replica replica, Conflict ignoreConflict);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
index 946a7f8..f48c989 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -18,364 +18,299 @@
package org.apache.cassandra.locator;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Predicate;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
-import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
import org.apache.cassandra.utils.FBUtilities;
-import static com.google.common.collect.Iterables.any;
+import java.util.Set;
+import java.util.function.Predicate;
/**
- * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors
- * for building the relevant layout.
+ * The relevant replicas for an operation over a given range or token.
*
- * Constitutes:
- * - the 'natural' replicas replicating the range or token relevant for the operation
- * - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates
- * - the 'selected' replicas, those that should be targeted for any operation
- * - 'all' replicas represents natural+pending
- *
- * @param <E> the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange)
- * @param <L> the type of itself, including its type parameters, for return type of modifying methods
+ * @param <E>
*/
-public abstract class ReplicaLayout<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+public abstract class ReplicaLayout<E extends Endpoints<E>>
{
- private volatile E all;
- protected final E natural;
- protected final E pending;
- protected final E selected;
-
- protected final Keyspace keyspace;
- protected final ConsistencyLevel consistencyLevel;
-
- private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected)
- {
- this(keyspace, consistencyLevel, natural, pending, selected, null);
- }
+ private final E natural;
- private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all)
+ ReplicaLayout(E natural)
{
- assert selected != null;
- assert pending == null || !Endpoints.haveConflicts(natural, pending);
- this.keyspace = keyspace;
- this.consistencyLevel = consistencyLevel;
this.natural = natural;
- this.pending = pending;
- this.selected = selected;
- // if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural
- if (all == null && pending == null)
- all = natural;
- this.all = all;
}
- public Replica getReplicaFor(InetAddressAndPort endpoint)
- {
- return natural.byEndpoint().get(endpoint);
- }
-
- public E natural()
+ /**
+ * The 'natural' owners of the ring position(s), as implied by the current ring layout.
+ * This excludes any pending owners, i.e. those that are in the process of taking ownership of a range, but
+ * have not yet finished obtaining their view of the range.
+ */
+ public final E natural()
{
return natural;
}
- public E all()
- {
- E result = all;
- if (result == null)
- all = result = Endpoints.concat(natural, pending);
- return result;
- }
-
- public E selected()
- {
- return selected;
- }
-
/**
- * @return the pending replicas - will be null for read layouts
- * TODO: ideally we would enforce at compile time that read layouts have no pending to access
+ * All relevant owners of the ring position(s) for this operation, as implied by the current ring layout.
+ * For writes, this will include pending owners, and for reads it will be equivalent to natural()
*/
- public E pending()
- {
- return pending;
- }
-
- public int blockFor()
+ public E all()
{
- return pending == null
- ? consistencyLevel.blockFor(keyspace)
- : consistencyLevel.blockForWrite(keyspace, pending);
+ return natural;
}
- public Keyspace keyspace()
+ public String toString()
{
- return keyspace;
+ return "ReplicaLayout [ natural: " + natural + " ]";
}
- public ConsistencyLevel consistencyLevel()
+ public static class ForTokenRead extends ReplicaLayout<EndpointsForToken> implements ForToken
{
- return consistencyLevel;
- }
-
- abstract public L withSelected(E replicas);
-
- abstract public L withConsistencyLevel(ConsistencyLevel cl);
-
- public L forNaturalUncontacted()
- {
- E more;
- if (consistencyLevel.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+ public ForTokenRead(EndpointsForToken natural)
{
- IEndpointSnitch snitch = keyspace.getReplicationStrategy().snitch;
- String localDC = DatabaseDescriptor.getLocalDataCenter();
+ super(natural);
+ }
- more = natural.filter(replica -> !selected.contains(replica) &&
- snitch.getDatacenter(replica).equals(localDC));
- } else
+ @Override
+ public Token token()
{
- more = natural.filter(replica -> !selected.contains(replica));
+ return natural().token();
}
- return withSelected(more);
+ public ReplicaLayout.ForTokenRead filter(Predicate<Replica> filter)
+ {
+ EndpointsForToken filtered = natural().filter(filter);
+ // AbstractReplicaCollection.filter returns itself if all elements match the filter
+ if (filtered == natural()) return this;
+ return new ReplicaLayout.ForTokenRead(filtered);
+ }
}
- public static class ForRange extends ReplicaLayout<EndpointsForRange, ForRange>
+ public static class ForRangeRead extends ReplicaLayout<EndpointsForRange> implements ForRange
{
- public final AbstractBounds<PartitionPosition> range;
+ final AbstractBounds<PartitionPosition> range;
- @VisibleForTesting
- public ForRange(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
+ public ForRangeRead(AbstractBounds<PartitionPosition> range, EndpointsForRange natural)
{
- // Range queries do not contact pending replicas
- super(keyspace, consistencyLevel, natural, null, selected);
+ super(natural);
this.range = range;
}
@Override
- public ForRange withSelected(EndpointsForRange newSelected)
+ public AbstractBounds<PartitionPosition> range()
{
- return new ForRange(keyspace, consistencyLevel, range, natural, newSelected);
+ return range;
}
- @Override
- public ForRange withConsistencyLevel(ConsistencyLevel cl)
+ public ReplicaLayout.ForRangeRead filter(Predicate<Replica> filter)
{
- return new ForRange(keyspace, cl, range, natural, selected);
+ EndpointsForRange filtered = natural().filter(filter);
+ // AbstractReplicaCollection.filter returns itself if all elements match the filter
+ if (filtered == natural()) return this;
+ return new ReplicaLayout.ForRangeRead(range(), filtered);
}
}
- public static class ForToken extends ReplicaLayout<EndpointsForToken, ForToken>
+ public static class ForWrite<E extends Endpoints<E>> extends ReplicaLayout<E>
{
- public final Token token;
+ final E all;
+ final E pending;
- @VisibleForTesting
- public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected)
+ ForWrite(E natural, E pending, E all)
{
- super(keyspace, consistencyLevel, natural, pending, selected);
- this.token = token;
+ super(natural);
+ assert pending != null && !haveWriteConflicts(natural, pending);
+ if (all == null)
+ all = Endpoints.concat(natural, pending);
+ this.all = all;
+ this.pending = pending;
}
- public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+ public final E all()
{
- super(keyspace, consistencyLevel, natural, pending, selected, all);
- this.token = token;
+ return all;
}
- public ForToken withSelected(EndpointsForToken newSelected)
+ public final E pending()
{
- return new ForToken(keyspace, consistencyLevel, token, natural, pending, newSelected);
+ return pending;
}
- @Override
- public ForToken withConsistencyLevel(ConsistencyLevel cl)
+ public String toString()
{
- return new ForToken(keyspace, cl, token, natural, pending, selected);
+ return "ReplicaLayout [ natural: " + natural() + ", pending: " + pending + " ]";
}
}
- public static class ForPaxos extends ForToken
+ public static class ForTokenWrite extends ForWrite<EndpointsForToken> implements ForToken
{
- private final int requiredParticipants;
-
- private ForPaxos(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int requiredParticipants, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+ public ForTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
{
- super(keyspace, consistencyLevel, token, natural, pending, selected, all);
- this.requiredParticipants = requiredParticipants;
+ this(natural, pending, null);
}
-
- public int getRequiredParticipants()
+ public ForTokenWrite(EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken all)
{
- return requiredParticipants;
+ super(natural, pending, all);
}
- }
- public static ForToken forSingleReplica(Keyspace keyspace, Token token, Replica replica)
- {
- EndpointsForToken singleReplica = EndpointsForToken.of(token, replica);
- return new ForToken(keyspace, ConsistencyLevel.ONE, token, singleReplica, EndpointsForToken.empty(token), singleReplica, singleReplica);
- }
-
- public static ForRange forSingleReplica(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
- {
- EndpointsForRange singleReplica = EndpointsForRange.of(replica);
- return new ForRange(keyspace, ConsistencyLevel.ONE, range, singleReplica, singleReplica);
- }
+ @Override
+ public Token token() { return natural().token(); }
- public static ForToken forCounterWrite(Keyspace keyspace, Token token, Replica replica)
- {
- return forSingleReplica(keyspace, token, replica);
+ public ReplicaLayout.ForTokenWrite filter(Predicate<Replica> filter)
+ {
+ EndpointsForToken filtered = all().filter(filter);
+ // AbstractReplicaCollection.filter returns itself if all elements match the filter
+ if (filtered == all()) return this;
+ // unique by endpoint, so can for efficiency filter only on endpoint
+ return new ReplicaLayout.ForTokenWrite(
+ natural().keep(filtered.endpoints()),
+ pending().keep(filtered.endpoints()),
+ filtered
+ );
+ }
}
- public static ForToken forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+ public interface ForRange
{
- // A single case we write not for range or token, but multiple mutations to many tokens
- Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
- EndpointsForToken natural = EndpointsForToken.copyOf(token, SystemReplicas.getSystemReplicas(endpoints));
- EndpointsForToken pending = EndpointsForToken.empty(token);
- ConsistencyLevel consistencyLevel = natural.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
-
- return forWriteWithDownNodes(keyspace, consistencyLevel, token, natural, pending);
+ public AbstractBounds<PartitionPosition> range();
}
- public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token) throws UnavailableException
+ public interface ForToken
{
- return forWrite(keyspace, consistencyLevel, token, Predicates.alwaysTrue());
+ public Token token();
}
- public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+ /**
+ * Gets the 'natural' and 'pending' replicas that own a given token, with no filtering or processing.
+ *
+ * Since a write is intended for all nodes (except, unless necessary, transient replicas), this method's
+ * only responsibility is to fetch the 'natural' and 'pending' replicas, then resolve any conflicts
+ * {@link ReplicaLayout#haveWriteConflicts(Endpoints, Endpoints)}
+ */
+ public static ReplicaLayout.ForTokenWrite forTokenWriteLiveAndDown(Keyspace keyspace, Token token)
{
- EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), token);
+ // TODO: race condition to fetch these. implications??
+ EndpointsForToken natural = keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspace.getName());
- return forWrite(keyspace, consistencyLevel, token, natural, pending, isAlive);
- }
-
- public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending) throws UnavailableException
- {
- return forWrite(keyspace, consistencyLevel, token, natural, pending, Predicates.alwaysTrue());
+ return forTokenWrite(natural, pending);
}
- public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+ public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
{
- if (Endpoints.haveConflicts(natural, pending))
+ if (haveWriteConflicts(natural, pending))
{
- natural = Endpoints.resolveConflictsInNatural(natural, pending);
- pending = Endpoints.resolveConflictsInPending(natural, pending);
+ natural = resolveWriteConflictsInNatural(natural, pending);
+ pending = resolveWriteConflictsInPending(natural, pending);
}
-
- if (!any(natural, Replica::isTransient) && !any(pending, Replica::isTransient))
- {
- EndpointsForToken selected = Endpoints.concat(natural, pending).filter(r -> isAlive.test(r.endpoint()));
- return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected);
- }
-
- return forWrite(keyspace, consistencyLevel, token, consistencyLevel.blockForWrite(keyspace, pending), natural, pending, isAlive);
+ return new ReplicaLayout.ForTokenWrite(natural, pending);
}
- public static ReplicaLayout.ForPaxos forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
+ /**
+ * Detect if we have any endpoint in both pending and full; this can occur either due to races (there is no isolation)
+ * or because an endpoint is transitioning between full and transient replication status.
+ *
+ * We essentially always prefer the full version for writes, because this is stricter.
+ *
+ * For transient->full transitions:
+ *
+ * Since we always write to any pending transient replica, effectively upgrading it to full for the transition duration,
+ * it might at first seem to be OK to continue treating the conflict replica as its 'natural' transient form,
+ * as there is always a quorum of nodes receiving the write. However, ring ownership changes are not atomic or
+ * consistent across the cluster, and it is possible for writers to see different ring states.
+ *
+ * Furthermore, an operator would expect that the full node has received all writes, with no extra need for repair
+ * (as the normal contract dictates) when it completes its transition.
+ *
+ * While we cannot completely eliminate risks due to ring inconsistencies, this approach is the most conservative
+ * available to us today to mitigate, and (we think) the easiest to reason about.
+ *
+ * For full->transient transitions:
+ *
+ * In this case, things are dicier, because in theory we can trigger this change instantly. All we need to do is
+ * drop some data, surely?
+ *
+ * Ring movements can put us in a pickle; any other node could believe us to be full when we have become transient,
+ * and perform a full data request to us that we believe ourselves capable of answering, but that we are not.
+ * If the ring is inconsistent, it's even feasible that a transient request would be made to the node that is losing
+ * its transient status, that also does not know it has yet done so, resulting in all involved nodes being unaware
+ * of the data inconsistency.
+ *
+ * This happens because ring ownership changes are implied by a single node; not all owning nodes get a say in when
+ * the transition takes effect. As such, a node can hold an incorrect belief about its own ownership ranges.
+ *
+ * This race condition is somewhat inherent in present day Cassandra, and there's actually a limit to what we can do about it.
+ * It is a little more dangerous with transient replication, however, because we can completely answer a request without
+ * ever touching a digest, meaning we are less likely to attempt to repair any inconsistency.
+ *
+ * We aren't guaranteed to contact any different nodes for the data requests, of course, though we at least have a chance.
+ *
+ * Note: If we have any pending transient->full movement, we need to move the full replica to our 'natural' bucket
+ * to avoid corrupting our count. This is fine for writes, all we're doing is ensuring we always write to the node,
+ * instead of selectively.
+ *
+ * @param natural
+ * @param pending
+ * @param <E>
+ * @return
+ */
+ static <E extends Endpoints<E>> boolean haveWriteConflicts(E natural, E pending)
{
- Token tk = key.getToken();
- EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), tk);
- EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, keyspace.getName());
- if (Endpoints.haveConflicts(natural, pending))
+ Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
+ for (InetAddressAndPort pendingEndpoint : pending.endpoints())
{
- natural = Endpoints.resolveConflictsInNatural(natural, pending);
- pending = Endpoints.resolveConflictsInPending(natural, pending);
+ if (naturalEndpoints.contains(pendingEndpoint))
+ return true;
}
-
- // TODO CASSANDRA-14547
- Replicas.temporaryAssertFull(natural);
- Replicas.temporaryAssertFull(pending);
-
- if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
- {
- // Restrict natural and pending to node in the local DC only
- String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
- IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica));
-
- natural = natural.filter(isLocalDc);
- pending = pending.filter(isLocalDc);
- }
-
- int participants = pending.size() + natural.size();
- int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
-
- EndpointsForToken all = Endpoints.concat(natural, pending);
- EndpointsForToken selected = all.filter(IAsyncCallback.isReplicaAlive);
- if (selected.size() < requiredParticipants)
- throw UnavailableException.create(consistencyForPaxos, requiredParticipants, selected.size());
-
- // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
- // Note that we fake an impossible number of required nodes in the unavailable exception
- // to nail home the point that it's an impossible operation no matter how many nodes are live.
- if (pending.size() > 1)
- throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", pending.size()),
- consistencyForPaxos,
- participants + 1,
- selected.size());
-
- return new ReplicaLayout.ForPaxos(keyspace, consistencyForPaxos, key.getToken(), requiredParticipants, natural, pending, selected, all);
+ return false;
}
/**
- * We want to send mutations to as many full replicas as we can, and just as many transient replicas
- * as we need to meet blockFor.
+ * MUST APPLY FIRST
+ * See {@link ReplicaLayout#haveWriteConflicts}
+ * @return a 'natural' replica collection, that has had its conflicts with pending repaired
*/
- @VisibleForTesting
- public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int blockFor, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> livePredicate) throws UnavailableException
+ private static <E extends Endpoints<E>> E resolveWriteConflictsInNatural(E natural, E pending)
{
- EndpointsForToken all = Endpoints.concat(natural, pending);
- EndpointsForToken selected = all
- .select()
- .add(r -> r.isFull() && livePredicate.test(r.endpoint()))
- .add(r -> r.isTransient() && livePredicate.test(r.endpoint()), blockFor)
- .get();
-
- consistencyLevel.assureSufficientLiveNodesForWrite(keyspace, selected, pending);
-
- return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected, all);
+ return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true));
}
- public static ForToken forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+ /**
+ * MUST APPLY SECOND
+ * See {@link ReplicaLayout#haveWriteConflicts}
+ * @return a 'pending' replica collection, that has had its conflicts with natural repaired
+ */
+ private static <E extends Endpoints<E>> E resolveWriteConflictsInPending(E natural, E pending)
{
- EndpointsForToken natural = StorageProxy.getLiveSortedReplicasForToken(keyspace, token);
- EndpointsForToken selected = consistencyLevel.filterForQuery(keyspace, natural, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
-
- // Throw UAE early if we don't have enough replicas.
- consistencyLevel.assureSufficientLiveNodesForRead(keyspace, selected);
-
- return new ForToken(keyspace, consistencyLevel, token, natural, null, selected);
+ return pending.without(natural.endpoints());
}
- public static ForRange forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
+ /**
+ * @return the read layout for a token - this includes only live natural replicas, i.e. those that are not pending
+ * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch
+ */
+ static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(Keyspace keyspace, Token token)
{
- return new ForRange(keyspace, consistencyLevel, range, natural, selected);
+ EndpointsForToken replicas = keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
+ replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
+ replicas = replicas.filter(FailureDetector.isReplicaAlive);
+ return new ReplicaLayout.ForTokenRead(replicas);
}
- public String toString()
+ /**
+ * TODO: we should really double check that the provided range does not overlap multiple token ring regions
+ * @return the read layout for a range - this includes only live natural replicas, i.e. those that are not pending
+ * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch
+ */
+ static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(Keyspace keyspace, AbstractBounds<PartitionPosition> range)
{
- return "ReplicaLayout [ CL: " + consistencyLevel + " keyspace: " + keyspace + " natural: " + natural + "pending: " + pending + " selected: " + selected + " ]";
+ EndpointsForRange replicas = keyspace.getReplicationStrategy().getNaturalReplicas(range.right);
+ replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
+ replicas = replicas.filter(FailureDetector.isReplicaAlive);
+ return new ReplicaLayout.ForRangeRead(range, replicas);
}
-}
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
new file mode 100644
index 0000000..4d6127b
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+
+import java.util.function.Predicate;
+
+public abstract class ReplicaPlan<E extends Endpoints<E>>
+{
+ protected final Keyspace keyspace;
+ protected final ConsistencyLevel consistencyLevel;
+
+ // all nodes we will contact via any mechanism, including hints
+ // i.e., for:
+ // - reads, only live natural replicas
+ // ==> live.natural().subList(0, blockFor + initial speculate)
+ // - writes, includes all full, and any pending replicas, (and only any necessary transient ones to make up the difference)
+ // ==> liveAndDown.natural().filter(isFull) ++ liveAndDown.pending() ++ live.natural.filter(isTransient, req)
+ // - paxos, includes all live replicas (natural+pending), for this DC if SERIAL_LOCAL
+ // ==> live.all() (if consistencyLevel.isDCLocal(), then .filter(consistencyLevel.isLocal))
+ private final E contacts;
+
+ ReplicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, E contacts)
+ {
+ assert contacts != null;
+ this.keyspace = keyspace;
+ this.consistencyLevel = consistencyLevel;
+ this.contacts = contacts;
+ }
+
+ public abstract int blockFor();
+ public abstract void assureSufficientReplicas();
+
+ public E contacts() { return contacts; }
+ public boolean contacts(Replica replica) { return contacts.contains(replica); }
+ public Keyspace keyspace() { return keyspace; }
+ public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
+
+ public static abstract class ForRead<E extends Endpoints<E>> extends ReplicaPlan<E>
+ {
+ // all nodes we *could* contacts; typically all natural replicas that are believed to be alive
+ // we will consult this collection to find uncontacted nodes we might contact if we doubt we will meet consistency level
+ private final E candidates;
+
+ ForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, E candidates, E contact)
+ {
+ super(keyspace, consistencyLevel, contact);
+ this.candidates = candidates;
+ }
+
+ public int blockFor() { return consistencyLevel.blockFor(keyspace); }
+ public void assureSufficientReplicas() { consistencyLevel.assureSufficientLiveReplicasForRead(keyspace, candidates()); }
+
+ public E candidates() { return candidates; }
+
+ public E uncontactedCandidates()
+ {
+ return candidates().filter(r -> !contacts(r));
+ }
+
+ public Replica firstUncontactedCandidate(Predicate<Replica> extraPredicate)
+ {
+ return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) && !contacts(r)).orNull();
+ }
+
+ public Replica getReplicaFor(InetAddressAndPort endpoint)
+ {
+ return candidates().byEndpoint().get(endpoint);
+ }
+
+ public String toString()
+ {
+ return "ReplicaPlan.ForRead [ CL: " + consistencyLevel + " keyspace: " + keyspace + " candidates: " + candidates + " contacts: " + contacts() + " ]";
+ }
+ }
+
+ public static class ForTokenRead extends ForRead<EndpointsForToken>
+ {
+ public ForTokenRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken candidates, EndpointsForToken contact)
+ {
+ super(keyspace, consistencyLevel, candidates, contact);
+ }
+
+ ForTokenRead withContact(EndpointsForToken newContact)
+ {
+ return new ForTokenRead(keyspace, consistencyLevel, candidates(), newContact);
+ }
+ }
+
+ public static class ForRangeRead extends ForRead<EndpointsForRange>
+ {
+ final AbstractBounds<PartitionPosition> range;
+
+ public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact)
+ {
+ super(keyspace, consistencyLevel, candidates, contact);
+ this.range = range;
+ }
+
+ public AbstractBounds<PartitionPosition> range() { return range; }
+
+ ForRangeRead withContact(EndpointsForRange newContact)
+ {
+ return new ForRangeRead(keyspace, consistencyLevel, range, candidates(), newContact);
+ }
+ }
+
+ public static abstract class ForWrite<E extends Endpoints<E>> extends ReplicaPlan<E>
+ {
+ // TODO: this is only needed because of poor isolation of concerns elsewhere - we can remove it soon, and will do so in a follow-up patch
+ final E pending;
+ final E liveAndDown;
+ final E live;
+
+ ForWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, E pending, E liveAndDown, E live, E contact)
+ {
+ super(keyspace, consistencyLevel, contact);
+ this.pending = pending;
+ this.liveAndDown = liveAndDown;
+ this.live = live;
+ }
+
+ public int blockFor() { return consistencyLevel.blockForWrite(keyspace, pending()); }
+ public void assureSufficientReplicas() { consistencyLevel.assureSufficientLiveReplicasForWrite(keyspace, live(), pending()); }
+
+ /** Replicas that a region of the ring is moving to; not yet ready to serve reads, but should receive writes */
+ public E pending() { return pending; }
+ /** Replicas that can participate in the write - this always includes all nodes (pending and natural) in all DCs, except for paxos LOCAL_QUORUM (which is local DC only) */
+ public E liveAndDown() { return liveAndDown; }
+ /** The live replicas present in liveAndDown, usually derived from FailureDetector.isReplicaAlive */
+ public E live() { return live; }
+ /** Calculate which live endpoints we could have contacted, but chose not to */
+ public E liveUncontacted() { return live().filter(r -> !contacts(r)); }
+ /** Test liveness, consistent with the upfront analysis done for this operation (i.e. test membership of live()) */
+ public boolean isAlive(Replica replica) { return live.endpoints().contains(replica.endpoint()); }
+
+ public String toString()
+ {
+ return "ReplicaPlan.ForWrite [ CL: " + consistencyLevel + " keyspace: " + keyspace + " liveAndDown: " + liveAndDown + " live: " + live + " contacts: " + contacts() + " ]";
+ }
+ }
+
+ public static class ForTokenWrite extends ForWrite<EndpointsForToken>
+ {
+ public ForTokenWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact)
+ {
+ super(keyspace, consistencyLevel, pending, liveAndDown, live, contact);
+ }
+
+ private ReplicaPlan.ForTokenWrite copy(ConsistencyLevel newConsistencyLevel, EndpointsForToken newContact)
+ {
+ return new ReplicaPlan.ForTokenWrite(keyspace, newConsistencyLevel, pending(), liveAndDown(), live(), newContact);
+ }
+
+ ForTokenWrite withConsistencyLevel(ConsistencyLevel newConsistencylevel) { return copy(newConsistencylevel, contacts()); }
+ public ForTokenWrite withContact(EndpointsForToken newContact) { return copy(consistencyLevel, newContact); }
+ }
+
+ public static class ForPaxosWrite extends ForWrite<EndpointsForToken>
+ {
+ final int requiredParticipants;
+
+ ForPaxosWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact, int requiredParticipants)
+ {
+ super(keyspace, consistencyLevel, pending, liveAndDown, live, contact);
+ this.requiredParticipants = requiredParticipants;
+ }
+
+ public int requiredParticipants() { return requiredParticipants; }
+ }
+
+ /**
+ * Used by AbstractReadExecutor, {Data,Digest}Resolver and ReadRepair to share a ReplicaPlan whose 'contacts' replicas
+ * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write)
+ *
+ * The internal reference is not volatile, despite being shared between threads. The initial reference provided to
+ * the constructor should be visible by the normal process of sharing data between threads (i.e. executors, etc)
+ * and any updates will either be seen or not seen, perhaps not promptly, but certainly not incompletely.
+ * The contained ReplicaPlan has only final member properties, so it cannot be seen partially initialised.
+ */
+ public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>>
+ {
+ /**
+ * add the provided replica to this shared plan, by updating the internal reference
+ */
+ public void addToContacts(Replica replica);
+ /**
+ * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised
+ */
+ public P get();
+ /**
+ * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised,
+ * but replace its 'contacts' with those provided
+ */
+ public abstract P getWithContacts(E endpoints);
+ }
+
+ public static class SharedForTokenRead implements Shared<EndpointsForToken, ForTokenRead>
+ {
+ private ForTokenRead replicaPlan;
+ SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; }
+ public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); }
+ public ForTokenRead get() { return replicaPlan; }
+ public ForTokenRead getWithContacts(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); }
+ }
+
+ public static class SharedForRangeRead implements Shared<EndpointsForRange, ForRangeRead>
+ {
+ private ForRangeRead replicaPlan;
+ SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; }
+ public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); }
+ public ForRangeRead get() { return replicaPlan; }
+ public ForRangeRead getWithContacts(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); }
+ }
+
+ public static SharedForTokenRead shared(ForTokenRead replicaPlan) { return new SharedForTokenRead(replicaPlan); }
+ public static SharedForRangeRead shared(ForRangeRead replicaPlan) { return new SharedForRangeRead(replicaPlan); }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org