You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/09/14 09:14:53 UTC
[3/4] cassandra git commit: ReplicaPlan/Layout refactor
follow-up/completion
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
new file mode 100644
index 0000000..25f42c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.limit;
+
+public class ReplicaPlans
+{
+
+ /**
+ * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. This node is *assumed* to be alive.
+ */
+ public static ReplicaPlan.ForTokenWrite forSingleReplicaWrite(Keyspace keyspace, Token token, Replica replica)
+ {
+ EndpointsForToken one = EndpointsForToken.of(token, replica);
+ EndpointsForToken empty = EndpointsForToken.empty(token);
+ return new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE, empty, one, one, one);
+ }
+
+ /**
+ * A forwarding counter write is always sent to a single owning coordinator for the range, by the original coordinator
+ * (if it is not itself an owner)
+ */
+ public static ReplicaPlan.ForTokenWrite forForwardingCounterWrite(Keyspace keyspace, Token token, Replica replica)
+ {
+ return forSingleReplicaWrite(keyspace, token, replica);
+ }
+
+ /**
+ * Requires that the provided endpoints are alive. Converts them to their relevant system replicas.
+ * Note that the liveAndDown collection and live are equal to the provided endpoints.
+ *
+ * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO.
+ * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear.
+ */
+ public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+ {
+ // A single case we write not for range or token, but multiple mutations to many tokens
+ Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+
+ ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+ SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+ EndpointsForToken.empty(token)
+ );
+ ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+ // assume that we have already been given live endpoints, and skip applying the failure detector
+ return forWrite(keyspace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
+ }
+
+ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException
+ {
+ return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector);
+ }
+
+ @VisibleForTesting
+ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken pending, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+ {
+ return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWrite(natural, pending), isAlive, selector);
+ }
+
+ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Selector selector) throws UnavailableException
+ {
+ return forWrite(keyspace, consistencyLevel, liveAndDown, FailureDetector.isReplicaAlive, selector);
+ }
+
+ @VisibleForTesting
+ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+ {
+ ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive);
+ return forWrite(keyspace, consistencyLevel, liveAndDown, live, selector);
+ }
+
+ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
+ {
+ EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live);
+ ReplicaPlan.ForTokenWrite result = new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
+ result.assureSufficientReplicas();
+ return result;
+ }
+
+ public interface Selector
+ {
+ <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live);
+ }
+
+ /**
+ * Select all nodes, transient or otherwise, as targets for the operation.
+ *
+ * This is may no longer be useful until we finish implementing transient replication support, however
+ * it can be of value to stipulate that a location writes to all nodes without regard to transient status.
+ */
+ public static final Selector writeAll = new Selector()
+ {
+ @Override
+ public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+ {
+ return liveAndDown.all();
+ }
+ };
+
+ /**
+ * Select all full nodes, live or down, as write targets. If there are insufficient nodes to complete the write,
+ * but there are live transient nodes, select a sufficient number of these to reach our consistency level.
+ *
+ * Pending nodes are always contacted, whether or not they are full. When a transient replica is undergoing
+ * a pending move to a new node, if we write (transiently) to it, this write would not be replicated to the
+ * pending transient node, and so when completing the move, the write could effectively have not reached the
+ * promised consistency level.
+ */
+ public static final Selector writeNormal = new Selector()
+ {
+ @Override
+ public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+ {
+ if (!any(liveAndDown.all(), Replica::isTransient))
+ return liveAndDown.all();
+
+ assert consistencyLevel != ConsistencyLevel.EACH_QUORUM;
+
+ ReplicaCollection.Mutable<E> contacts = liveAndDown.all().newMutable(liveAndDown.all().size());
+ contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
+ contacts.addAll(liveAndDown.pending());
+
+ // TODO: this doesn't correctly handle LOCAL_QUORUM (or EACH_QUORUM at all)
+ int liveCount = contacts.count(live.all()::contains);
+ int requiredTransientCount = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount;
+ if (requiredTransientCount > 0)
+ contacts.addAll(limit(filter(live.natural(), Replica::isTransient), requiredTransientCount));
+ return contacts.asSnapshot();
+ }
+ };
+
+ /**
+ * Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison,
+ * but for the paxos linearisation agreement.
+ *
+ * This will select all live nodes as the candidates for the operation. Only the required number of participants
+ */
+ public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
+ {
+ Token tk = key.getToken();
+ ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
+
+ Replicas.temporaryAssertFull(liveAndDown.all()); // TODO CASSANDRA-14547
+
+ if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
+ {
+ // TODO: we should cleanup our semantics here, as we're filtering ALL nodes to localDC which is unexpected for ReplicaPlan
+ // Restrict natural and pending to node in the local DC only
+ String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica));
+
+ liveAndDown = liveAndDown.filter(isLocalDc);
+ }
+
+ ReplicaLayout.ForTokenWrite live = liveAndDown.filter(FailureDetector.isReplicaAlive);
+
+ // TODO: this should use assureSufficientReplicas
+ int participants = liveAndDown.all().size();
+ int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
+
+ EndpointsForToken contacts = live.all();
+ if (contacts.size() < requiredParticipants)
+ throw UnavailableException.create(consistencyForPaxos, requiredParticipants, contacts.size());
+
+ // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
+ // Note that we fake an impossible number of required nodes in the unavailable exception
+ // to nail home the point that it's an impossible operation no matter how many nodes are live.
+ if (liveAndDown.pending().size() > 1)
+ throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", liveAndDown.all().size()),
+ consistencyForPaxos,
+ participants + 1,
+ contacts.size());
+
+ return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, requiredParticipants);
+ }
+
+ /**
+ * Construct a plan for reading from a single node - this permits no speculation or read-repair
+ */
+ public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace keyspace, Token token, Replica replica)
+ {
+ EndpointsForToken one = EndpointsForToken.of(token, replica);
+ return new ReplicaPlan.ForTokenRead(keyspace, ConsistencyLevel.ONE, one, one);
+ }
+
+ /**
+ * Construct a plan for reading from a single node - this permits no speculation or read-repair
+ */
+ public static ReplicaPlan.ForRangeRead forSingleReplicaRead(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
+ {
+ // TODO: this is unsafe, as one.range() may be inconsistent with our supplied range; should refactor Range/AbstractBounds to single class
+ EndpointsForRange one = EndpointsForRange.of(replica);
+ return new ReplicaPlan.ForRangeRead(keyspace, ConsistencyLevel.ONE, range, one, one);
+ }
+
+ /**
+ * Construct a plan for reading the provided token at the provided consistency level. This translates to a collection of
+ * - candidates who are: alive, replicate the token, and are sorted by their snitch scores
+ * - contacts who are: the first blockFor + (retry == ALWAYS ? 1 : 0) candidates
+ *
+ * The candidate collection can be used for speculation, although at present it would break
+ * LOCAL_QUORUM and EACH_QUORUM to do so without further filtering
+ */
+ public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+ {
+ ReplicaLayout.ForTokenRead candidates = ReplicaLayout.forTokenReadLiveSorted(keyspace, token);
+ EndpointsForToken contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural(),
+ retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
+
+ ReplicaPlan.ForTokenRead result = new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, candidates.natural(), contacts);
+ result.assureSufficientReplicas(); // Throw UAE early if we don't have enough replicas.
+ return result;
+ }
+
+ /**
+ * Construct a plan for reading the provided range at the provided consistency level. This translates to a collection of
+ * - candidates who are: alive, replicate the range, and are sorted by their snitch scores
+ * - contacts who are: the first blockFor candidates
+ *
+ * There is no speculation for range read queries at present, so we never 'always speculate' here, and a failed response fails the query.
+ */
+ public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range)
+ {
+ ReplicaLayout.ForRangeRead candidates = ReplicaLayout.forRangeReadLiveSorted(keyspace, range);
+ EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural());
+
+ ReplicaPlan.ForRangeRead result = new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, candidates.natural(), contacts);
+ result.assureSufficientReplicas();
+ return result;
+ }
+
+ /**
+ * Take two range read plans for adjacent ranges, and check if it is OK (and worthwhile) to combine them into a single plan
+ */
+ public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaPlan.ForRangeRead left, ReplicaPlan.ForRangeRead right)
+ {
+ // TODO: should we be asserting that the ranges are adjacent?
+ AbstractBounds<PartitionPosition> newRange = left.range().withNewRight(right.range().right);
+ EndpointsForRange mergedCandidates = left.candidates().keep(right.candidates().endpoints());
+
+ // Check if there are enough shared endpoints for the merge to be possible.
+ if (!consistencyLevel.isSufficientLiveReplicasForRead(keyspace, mergedCandidates))
+ return null;
+
+ EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, mergedCandidates);
+
+ // Estimate whether merging will be a win or not
+ if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, left.contacts(), right.contacts()))
+ return null;
+
+ // If we get there, merge this range and the next one
+ return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, newRange, mergedCandidates, contacts);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/SystemReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java
index 13a9d74..0d1fc8d 100644
--- a/src/java/org/apache/cassandra/locator/SystemReplicas.java
+++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java
@@ -18,12 +18,11 @@
package org.apache.cassandra.locator;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Collections2;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -50,13 +49,8 @@ public class SystemReplicas
return systemReplicas.computeIfAbsent(endpoint, SystemReplicas::createSystemReplica);
}
- public static Collection<Replica> getSystemReplicas(Collection<InetAddressAndPort> endpoints)
+ public static EndpointsForRange getSystemReplicas(Collection<InetAddressAndPort> endpoints)
{
- List<Replica> replicas = new ArrayList<>(endpoints.size());
- for (InetAddressAndPort endpoint: endpoints)
- {
- replicas.add(getSystemReplica(endpoint));
- }
- return replicas;
+ return EndpointsForRange.copyOf(Collections2.transform(endpoints, SystemReplicas::getSystemReplica));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 4ab34db..ad40d7b 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -1224,15 +1224,11 @@ public class TokenMetadata
/**
* @deprecated retained for benefit of old tests
*/
+ @Deprecated
public EndpointsForToken getWriteEndpoints(Token token, String keyspaceName, EndpointsForToken natural)
{
EndpointsForToken pending = pendingEndpointsForToken(token, keyspaceName);
- if (Endpoints.haveConflicts(natural, pending))
- {
- natural = Endpoints.resolveConflictsInNatural(natural, pending);
- pending = Endpoints.resolveConflictsInPending(natural, pending);
- }
- return Endpoints.concat(natural, pending);
+ return ReplicaLayout.forTokenWrite(natural, pending).all();
}
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 253b412..ceaf072 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -17,12 +17,6 @@
*/
package org.apache.cassandra.net;
-import com.google.common.base.Predicate;
-
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
-
/**
* implementors of IAsyncCallback need to make sure that any public methods
* are threadsafe with respect to response() being called from the message
@@ -31,10 +25,6 @@ import org.apache.cassandra.locator.Replica;
*/
public interface IAsyncCallback<T>
{
- final Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive;
-
- final Predicate<Replica> isReplicaAlive = replica -> isAlive.apply(replica.endpoint());
-
/**
* @param msg response received.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index e817cc8..7f51ae7 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -26,8 +26,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;
import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.exceptions.WriteFailureException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -53,7 +53,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
//Count down until all responses and expirations have occured before deciding whether the ideal CL was reached.
private AtomicInteger responsesAndExpirations;
private final SimpleCondition condition = new SimpleCondition();
- protected final ReplicaLayout.ForToken replicaLayout;
+ protected final ReplicaPlan.ForTokenWrite replicaPlan;
protected final Runnable callback;
protected final WriteType writeType;
@@ -76,12 +76,12 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
* @param callback A callback to be called when the write is successful.
* @param queryStartNanoTime
*/
- protected AbstractWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ protected AbstractWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = replicaPlan;
this.callback = callback;
this.writeType = writeType;
this.failureReasonByEndpoint = new ConcurrentHashMap<>();
@@ -104,19 +104,19 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
if (!success)
{
- int blockedFor = totalBlockFor();
+ int blockedFor = blockFor();
int acks = ackCount();
// It's pretty unlikely, but we can race between exiting await above and here, so
// that we could now have enough acks. In that case, we "lie" on the acks count to
// avoid sending confusing info to the user (see CASSANDRA-6491).
if (acks >= blockedFor)
acks = blockedFor - 1;
- throw new WriteTimeoutException(writeType, replicaLayout.consistencyLevel(), acks, blockedFor);
+ throw new WriteTimeoutException(writeType, replicaPlan.consistencyLevel(), acks, blockedFor);
}
- if (totalBlockFor() + failures > totalEndpoints())
+ if (blockFor() + failures > candidateReplicaCount())
{
- throw new WriteFailureException(replicaLayout.consistencyLevel(), ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint);
+ throw new WriteFailureException(replicaPlan.consistencyLevel(), ackCount(), blockFor(), writeType, failureReasonByEndpoint);
}
}
@@ -135,7 +135,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
public void setIdealCLResponseHandler(AbstractWriteResponseHandler handler)
{
this.idealCLDelegate = handler;
- idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaLayout.selected().size());
+ idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaPlan.contacts().size());
}
/**
@@ -189,28 +189,30 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
/**
* @return the minimum number of endpoints that must reply.
*/
- protected int totalBlockFor()
+ protected int blockFor()
{
// During bootstrap, we have to include the pending endpoints or we may fail the consistency level
// guarantees (see #833)
- return replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), replicaLayout.pending());
+ return replicaPlan.blockFor();
}
/**
+ * TODO: this method is brittle for its purpose of deciding when we should fail a query;
+ * this needs to be CL aware, and of which nodes are live/down
* @return the total number of endpoints the request can been sent to.
*/
- protected int totalEndpoints()
+ protected int candidateReplicaCount()
{
- return replicaLayout.all().size();
+ return replicaPlan.liveAndDown().size();
}
public ConsistencyLevel consistencyLevel()
{
- return replicaLayout.consistencyLevel();
+ return replicaPlan.consistencyLevel();
}
/**
- * @return true if the message counts towards the totalBlockFor() threshold
+ * @return true if the message counts towards the blockFor() threshold
*/
protected boolean waitingFor(InetAddressAndPort from)
{
@@ -227,11 +229,6 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
*/
public abstract void response(MessageIn<T> msg);
- public void assureSufficientLiveNodes() throws UnavailableException
- {
- replicaLayout.consistencyLevel().assureSufficientLiveNodesForWrite(replicaLayout.keyspace(), replicaLayout.all().filter(isReplicaAlive), replicaLayout.pending());
- }
-
protected void signal()
{
condition.signalAll();
@@ -250,7 +247,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
failureReasonByEndpoint.put(from, failureReason);
- if (totalBlockFor() + n > totalEndpoints())
+ if (blockFor() + n > candidateReplicaCount())
signal();
}
@@ -278,11 +275,11 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
//The condition being signaled is a valid proxy for the CL being achieved
if (!condition.isSignaled())
{
- replicaLayout.keyspace().metric.writeFailedIdealCL.inc();
+ replicaPlan.keyspace().metric.writeFailedIdealCL.inc();
}
else
{
- replicaLayout.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime);
+ replicaPlan.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime);
}
}
}
@@ -292,7 +289,8 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
*/
public void maybeTryAdditionalReplicas(IMutation mutation, StorageProxy.WritePerformer writePerformer, String localDC)
{
- if (replicaLayout.all().size() == replicaLayout.selected().size())
+ EndpointsForToken uncontacted = replicaPlan.liveUncontacted();
+ if (uncontacted.isEmpty())
return;
long timeout = Long.MAX_VALUE;
@@ -313,7 +311,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
for (ColumnFamilyStore cf : cfs)
cf.metric.speculativeWrites.inc();
- writePerformer.apply(mutation, replicaLayout.forNaturalUncontacted(),
+ writePerformer.apply(mutation, replicaPlan.withContact(uncontacted),
(AbstractWriteResponseHandler<IMutation>) this,
localDC);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 8ffca6a..b32f67e 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -331,7 +331,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
Multimap<String, InetAddressAndPort> dcEndpointsMap = topology.getDatacenterEndpoints();
Iterable<InetAddressAndPort> dcEndpoints = concat(transform(dataCenters, dcEndpointsMap::get));
- return neighbors.keep(dcEndpoints);
+ return neighbors.select(dcEndpoints, true);
}
else if (hosts != null && !hosts.isEmpty())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index ee74df5..63fbc72 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -36,7 +36,7 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime)
{
- super(wrapped.replicaLayout, wrapped.callback, wrapped.writeType, queryStartNanoTime);
+ super(wrapped.replicaPlan, wrapped.callback, wrapped.writeType, queryStartNanoTime);
this.wrapped = wrapped;
this.requiredBeforeFinish = requiredBeforeFinish;
this.cleanup = cleanup;
@@ -64,24 +64,19 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
wrapped.onFailure(from, failureReason);
}
- public void assureSufficientLiveNodes()
- {
- wrapped.assureSufficientLiveNodes();
- }
-
public void get() throws WriteTimeoutException, WriteFailureException
{
wrapped.get();
}
- protected int totalBlockFor()
+ protected int blockFor()
{
- return wrapped.totalBlockFor();
+ return wrapped.blockFor();
}
- protected int totalEndpoints()
+ protected int candidateReplicaCount()
{
- return wrapped.totalEndpoints();
+ return wrapped.candidateReplicaCount();
}
protected boolean waitingFor(InetAddressAndPort from)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index d4cdcc6..4c892ff 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -22,10 +22,10 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.WriteType;
@@ -40,16 +40,16 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse
private final Map<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
private final AtomicInteger acks = new AtomicInteger(0);
- public DatacenterSyncWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
// Response is been managed by the map so make it 1 for the superclass.
- super(replicaLayout, callback, writeType, queryStartNanoTime);
- assert replicaLayout.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
+ super(replicaPlan, callback, writeType, queryStartNanoTime);
+ assert replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaLayout.keyspace().getReplicationStrategy();
+ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaPlan.keyspace().getReplicationStrategy();
for (String dc : strategy.getDatacenters())
{
@@ -59,7 +59,7 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse
// During bootstrap, we have to include the pending endpoints or we may fail the consistency level
// guarantees (see #833)
- for (Replica pending : replicaLayout.pending())
+ for (Replica pending : replicaPlan.pending())
{
responses.get(snitch.getDatacenter(pending)).incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index b458a71..a3ef76f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.service;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
/**
@@ -27,13 +27,13 @@ import org.apache.cassandra.net.MessageIn;
*/
public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
{
- public DatacenterWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ public DatacenterWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
- super(replicaLayout, callback, writeType, queryStartNanoTime);
- assert replicaLayout.consistencyLevel().isDatacenterLocal();
+ super(replicaPlan, callback, writeType, queryStartNanoTime);
+ assert replicaPlan.consistencyLevel().isDatacenterLocal();
}
@Override
@@ -54,6 +54,6 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
@Override
protected boolean waitingFor(InetAddressAndPort from)
{
- return replicaLayout.consistencyLevel().isLocal(from);
+ return replicaPlan.consistencyLevel().isLocal(from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5eb43cf..fc49330 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -60,7 +60,6 @@ import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.ViewUtils;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
@@ -135,7 +134,7 @@ public class StorageProxy implements StorageProxyMBean
standardWritePerformer = (mutation, targets, responseHandler, localDataCenter) ->
{
assert mutation instanceof Mutation;
- sendToHintedReplicas((Mutation) mutation, targets.selected(), responseHandler, localDataCenter, Stage.MUTATION);
+ sendToHintedReplicas((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION);
};
/*
@@ -146,17 +145,17 @@ public class StorageProxy implements StorageProxyMBean
*/
counterWritePerformer = (mutation, targets, responseHandler, localDataCenter) ->
{
- EndpointsForToken selected = targets.selected().withoutSelf();
+ EndpointsForToken selected = targets.contacts().withoutSelf();
Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
- counterWriteTask(mutation, selected, responseHandler, localDataCenter).run();
+ counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter).run();
};
counterWriteOnCoordinatorPerformer = (mutation, targets, responseHandler, localDataCenter) ->
{
- EndpointsForToken selected = targets.selected().withoutSelf();
+ EndpointsForToken selected = targets.contacts().withoutSelf();
Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
StageManager.getStage(Stage.COUNTER_MUTATION)
- .execute(counterWriteTask(mutation, selected, responseHandler, localDataCenter));
+ .execute(counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter));
};
for(ConsistencyLevel level : ConsistencyLevel.values())
@@ -232,9 +231,9 @@ public class StorageProxy implements StorageProxyMBean
while (System.nanoTime() - queryStartNanoTime < timeout)
{
// for simplicity, we'll do a single liveness check at the start of each attempt
- ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
+ ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
- final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaLayout, consistencyForPaxos, consistencyForCommit, true, state);
+ final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaPlan, consistencyForPaxos, consistencyForCommit, true, state);
final UUID ballot = pair.ballot;
contentions += pair.contentions;
@@ -276,7 +275,7 @@ public class StorageProxy implements StorageProxyMBean
Commit proposal = Commit.newProposal(ballot, updates);
Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
- if (proposePaxos(proposal, replicaLayout, true, queryStartNanoTime))
+ if (proposePaxos(proposal, replicaPlan, true, queryStartNanoTime))
{
commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime);
Tracing.trace("CAS successful");
@@ -334,7 +333,7 @@ public class StorageProxy implements StorageProxyMBean
private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime,
DecoratedKey key,
TableMetadata metadata,
- ReplicaLayout.ForPaxos replicaLayout,
+ ReplicaPlan.ForPaxosWrite paxosPlan,
ConsistencyLevel consistencyForPaxos,
ConsistencyLevel consistencyForCommit,
final boolean isWrite,
@@ -360,7 +359,7 @@ public class StorageProxy implements StorageProxyMBean
// prepare
Tracing.trace("Preparing {}", ballot);
Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
- summary = preparePaxos(toPrepare, replicaLayout, queryStartNanoTime);
+ summary = preparePaxos(toPrepare, paxosPlan, queryStartNanoTime);
if (!summary.promised)
{
Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
@@ -383,7 +382,7 @@ public class StorageProxy implements StorageProxyMBean
else
casReadMetrics.unfinishedCommit.inc();
Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
- if (proposePaxos(refreshedInProgress, replicaLayout, false, queryStartNanoTime))
+ if (proposePaxos(refreshedInProgress, paxosPlan, false, queryStartNanoTime))
{
try
{
@@ -440,12 +439,12 @@ public class StorageProxy implements StorageProxyMBean
MessagingService.instance().sendOneWay(message, target);
}
- private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaLayout.ForPaxos replicaLayout, long queryStartNanoTime)
+ private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaPlan.ForPaxosWrite replicaPlan, long queryStartNanoTime)
throws WriteTimeoutException
{
- PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaLayout.getRequiredParticipants(), replicaLayout.consistencyLevel(), queryStartNanoTime);
+ PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), queryStartNanoTime);
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer);
- for (Replica replica: replicaLayout.selected())
+ for (Replica replica: replicaPlan.contacts())
{
if (replica.isLocal())
{
@@ -478,12 +477,12 @@ public class StorageProxy implements StorageProxyMBean
return callback;
}
- private static boolean proposePaxos(Commit proposal, ReplicaLayout.ForPaxos replicaLayout, boolean timeoutIfPartial, long queryStartNanoTime)
+ private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite replicaPlan, boolean timeoutIfPartial, long queryStartNanoTime)
throws WriteTimeoutException
{
- ProposeCallback callback = new ProposeCallback(replicaLayout.selected().size(), replicaLayout.getRequiredParticipants(), !timeoutIfPartial, replicaLayout.consistencyLevel(), queryStartNanoTime);
+ ProposeCallback callback = new ProposeCallback(replicaPlan.contacts().size(), replicaPlan.requiredParticipants(), !timeoutIfPartial, replicaPlan.consistencyLevel(), queryStartNanoTime);
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer);
- for (Replica replica : replicaLayout.selected())
+ for (Replica replica : replicaPlan.contacts())
{
if (replica.isLocal())
{
@@ -518,7 +517,7 @@ public class StorageProxy implements StorageProxyMBean
return true;
if (timeoutIfPartial && !callback.isFullyRefused())
- throw new WriteTimeoutException(WriteType.CAS, replicaLayout.consistencyLevel(), callback.getAcceptCount(), replicaLayout.getRequiredParticipants());
+ throw new WriteTimeoutException(WriteType.CAS, replicaPlan.consistencyLevel(), callback.getAcceptCount(), replicaPlan.requiredParticipants());
return false;
}
@@ -531,21 +530,22 @@ public class StorageProxy implements StorageProxyMBean
Token tk = proposal.update.partitionKey().getToken();
AbstractWriteResponseHandler<Commit> responseHandler = null;
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive);
+ // NOTE: this ReplicaPlan is a lie, this usage of ReplicaPlan could do with being clarified - the selected() collection is essentially (I think) never used
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeAll);
if (shouldBlock)
{
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
- responseHandler = rs.getWriteResponseHandler(replicaLayout, null, WriteType.SIMPLE, queryStartNanoTime);
+ responseHandler = rs.getWriteResponseHandler(replicaPlan, null, WriteType.SIMPLE, queryStartNanoTime);
responseHandler.setSupportsBackPressure(false);
}
- MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
- for (Replica replica : replicaLayout.all())
+ MessageOut<Commit> message = new MessageOut<>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
+ for (Replica replica : replicaPlan.liveAndDown())
{
InetAddressAndPort destination = replica.endpoint();
checkHintOverload(replica);
- if (FailureDetector.instance.isAlive(destination))
+ if (replicaPlan.isAlive(replica))
{
if (shouldBlock)
{
@@ -616,10 +616,10 @@ public class StorageProxy implements StorageProxyMBean
* the data across to some other replica.
*
* @param mutations the mutations to be applied across the replicas
- * @param consistency_level the consistency level for the operation
+ * @param consistencyLevel the consistency level for the operation
* @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
*/
- public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime)
+ public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException
{
Tracing.trace("Determining replicas for mutation");
@@ -637,7 +637,7 @@ public class StorageProxy implements StorageProxyMBean
if (mutation instanceof CounterMutation)
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime));
else
- responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime));
+ responseHandlers.add(performWrite(mutation, consistencyLevel, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime));
}
// upgrade to full quorum any failed cheap quorums
@@ -653,7 +653,7 @@ public class StorageProxy implements StorageProxyMBean
}
catch (WriteTimeoutException|WriteFailureException ex)
{
- if (consistency_level == ConsistencyLevel.ANY)
+ if (consistencyLevel == ConsistencyLevel.ANY)
{
hintMutations(mutations);
}
@@ -662,7 +662,7 @@ public class StorageProxy implements StorageProxyMBean
if (ex instanceof WriteFailureException)
{
writeMetrics.failures.mark();
- writeMetricsMap.get(consistency_level).failures.mark();
+ writeMetricsMap.get(consistencyLevel).failures.mark();
WriteFailureException fe = (WriteFailureException)ex;
Tracing.trace("Write failure; received {} of {} required replies, failed {} requests",
fe.received, fe.blockFor, fe.failureReasonByEndpoint.size());
@@ -670,7 +670,7 @@ public class StorageProxy implements StorageProxyMBean
else
{
writeMetrics.timeouts.mark();
- writeMetricsMap.get(consistency_level).timeouts.mark();
+ writeMetricsMap.get(consistencyLevel).timeouts.mark();
WriteTimeoutException te = (WriteTimeoutException)ex;
Tracing.trace("Write timeout; received {} of {} required replies", te.received, te.blockFor);
}
@@ -680,14 +680,14 @@ public class StorageProxy implements StorageProxyMBean
catch (UnavailableException e)
{
writeMetrics.unavailables.mark();
- writeMetricsMap.get(consistency_level).unavailables.mark();
+ writeMetricsMap.get(consistencyLevel).unavailables.mark();
Tracing.trace("Unavailable");
throw e;
}
catch (OverloadedException e)
{
writeMetrics.unavailables.mark();
- writeMetricsMap.get(consistency_level).unavailables.mark();
+ writeMetricsMap.get(consistencyLevel).unavailables.mark();
Tracing.trace("Overloaded");
throw e;
}
@@ -695,7 +695,7 @@ public class StorageProxy implements StorageProxyMBean
{
long latency = System.nanoTime() - startTime;
writeMetrics.addNano(latency);
- writeMetricsMap.get(consistency_level).addNano(latency);
+ writeMetricsMap.get(consistencyLevel).addNano(latency);
updateCoordinatorWriteLatencyTableMetric(mutations, latency);
}
}
@@ -725,7 +725,8 @@ public class StorageProxy implements StorageProxyMBean
// local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510),
// so there is no need to hint or retry.
- EndpointsForToken replicasToHint = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token)
+ EndpointsForToken replicasToHint = ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token)
+ .all()
.filter(StorageProxy::shouldHint);
submitHint(mutation, replicasToHint, null);
@@ -737,8 +738,8 @@ public class StorageProxy implements StorageProxyMBean
Token token = mutation.key().getToken();
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- return StorageService.instance.getNaturalReplicasForToken(keyspaceName, token).endpoints().contains(local)
- || StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspaceName).endpoints().contains(local);
+ return ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token)
+ .all().endpoints().contains(local);
}
/**
@@ -934,7 +935,6 @@ public class StorageProxy implements StorageProxyMBean
cleanup,
queryStartNanoTime);
// exit early if we can't fulfill the CL at this time.
- wrapper.handler.assureSufficientLiveNodes();
wrappers.add(wrapper);
}
@@ -1002,14 +1002,14 @@ public class StorageProxy implements StorageProxyMBean
throws WriteTimeoutException, WriteFailureException
{
Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forBatchlogWrite(systemKeypsace, endpoints);
- WriteResponseHandler<?> handler = new WriteResponseHandler(replicaLayout,
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(systemKeypsace, endpoints);
+ WriteResponseHandler<?> handler = new WriteResponseHandler(replicaPlan,
WriteType.BATCH_LOG,
queryStartNanoTime);
Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
- for (Replica replica : replicaLayout.all())
+ for (Replica replica : replicaPlan.liveAndDown())
{
logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size());
@@ -1040,12 +1040,12 @@ public class StorageProxy implements StorageProxyMBean
{
for (WriteResponseHandlerWrapper wrapper : wrappers)
{
- Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); // TODO: CASSANDRA-14549
- ReplicaLayout.ForToken replicas = wrapper.handler.replicaLayout.withSelected(wrapper.handler.replicaLayout.all());
+ Replicas.temporaryAssertFull(wrapper.handler.replicaPlan.liveAndDown()); // TODO: CASSANDRA-14549
+ ReplicaPlan.ForTokenWrite replicas = wrapper.handler.replicaPlan.withContact(wrapper.handler.replicaPlan.liveAndDown());
try
{
- sendToHintedReplicas(wrapper.mutation, replicas.selected(), wrapper.handler, localDataCenter, stage);
+ sendToHintedReplicas(wrapper.mutation, replicas, wrapper.handler, localDataCenter, stage);
}
catch (OverloadedException | WriteTimeoutException e)
{
@@ -1059,11 +1059,11 @@ public class StorageProxy implements StorageProxyMBean
{
for (WriteResponseHandlerWrapper wrapper : wrappers)
{
- Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); // TODO: CASSANDRA-14549
- sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaLayout.all(), wrapper.handler, localDataCenter, stage);
+ EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown();
+ Replicas.temporaryAssertFull(sendTo); // TODO: CASSANDRA-14549
+ sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaPlan.withContact(sendTo), wrapper.handler, localDataCenter, stage);
}
-
for (WriteResponseHandlerWrapper wrapper : wrappers)
wrapper.handler.get();
}
@@ -1096,13 +1096,10 @@ public class StorageProxy implements StorageProxyMBean
Token tk = mutation.key().getToken();
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk);
- AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime);
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+ AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaPlan, callback, writeType, queryStartNanoTime);
- // exit early if we can't fulfill the CL at this time
- responseHandler.assureSufficientLiveNodes();
-
- performer.apply(mutation, replicaLayout, responseHandler, localDataCenter);
+ performer.apply(mutation, replicaPlan, responseHandler, localDataCenter);
return responseHandler;
}
@@ -1118,8 +1115,8 @@ public class StorageProxy implements StorageProxyMBean
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
Token tk = mutation.key().getToken();
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive);
- AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout,null, writeType, queryStartNanoTime);
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+ AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan,null, writeType, queryStartNanoTime);
BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
return new WriteResponseHandlerWrapper(batchHandler, mutation);
}
@@ -1140,10 +1137,11 @@ public class StorageProxy implements StorageProxyMBean
{
Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
- Token tk = mutation.key().getToken();
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk, naturalEndpoints, pendingEndpoints);
- AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout, () -> {
+ ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(naturalEndpoints, pendingEndpoints);
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, liveAndDown, ReplicaPlans.writeAll);
+
+ AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan, () -> {
long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get());
viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS);
}, writeType, queryStartNanoTime);
@@ -1208,7 +1206,7 @@ public class StorageProxy implements StorageProxyMBean
* @throws OverloadedException if the hints cannot be written/enqueued
*/
public static void sendToHintedReplicas(final Mutation mutation,
- EndpointsForToken targets,
+ ReplicaPlan.ForTokenWrite plan,
AbstractWriteResponseHandler<IMutation> responseHandler,
String localDataCenter,
Stage stage)
@@ -1227,11 +1225,11 @@ public class StorageProxy implements StorageProxyMBean
List<InetAddressAndPort> backPressureHosts = null;
- for (Replica destination : targets)
+ for (Replica destination : plan.contacts())
{
checkHintOverload(destination);
- if (FailureDetector.instance.isAlive(destination.endpoint()))
+ if (plan.isAlive(destination))
{
if (destination.isLocal())
{
@@ -1251,7 +1249,7 @@ public class StorageProxy implements StorageProxyMBean
if (localDataCenter.equals(dc))
{
if (localDc == null)
- localDc = new ArrayList<>(targets.size());
+ localDc = new ArrayList<>(plan.contacts().size());
localDc.add(destination);
}
@@ -1268,7 +1266,7 @@ public class StorageProxy implements StorageProxyMBean
}
if (backPressureHosts == null)
- backPressureHosts = new ArrayList<>(targets.size());
+ backPressureHosts = new ArrayList<>(plan.contacts().size());
backPressureHosts.add(destination.endpoint());
}
@@ -1345,7 +1343,7 @@ public class StorageProxy implements StorageProxyMBean
message,
destination,
message.getTimeout(),
- handler.replicaLayout.consistencyLevel(),
+ handler.replicaPlan.consistencyLevel(),
true);
messageIds[idIdx++] = id;
logger.trace("Adding FWD message to {}@{}", id, destination);
@@ -1435,14 +1433,13 @@ public class StorageProxy implements StorageProxyMBean
// Exit now if we can't fulfill the CL here instead of forwarding to the leader replica
String keyspaceName = cm.getKeyspaceName();
Keyspace keyspace = Keyspace.open(keyspaceName);
- AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
Token tk = cm.key().getToken();
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, cm.consistency(), tk);
- rs.getWriteResponseHandler(replicaLayout, null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes();
+ ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, ReplicaPlans.writeAll)
+ .assureSufficientReplicas();
// Forward the actual update to the chosen leader replica
- AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaLayout.forCounterWrite(keyspace, tk, replica),
+ AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaPlans.forForwardingCounterWrite(keyspace, tk, replica),
WriteType.COUNTER, queryStartNanoTime);
Tracing.trace("Enqueuing counter update to {}", replica);
@@ -1465,7 +1462,7 @@ public class StorageProxy implements StorageProxyMBean
{
Keyspace keyspace = Keyspace.open(keyspaceName);
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- EndpointsForToken replicas = StorageService.instance.getLiveNaturalReplicasForToken(keyspace, key);
+ EndpointsForToken replicas = keyspace.getReplicationStrategy().getNaturalReplicasForToken(key);
// CASSANDRA-13043: filter out those endpoints not accepting clients yet, maybe because still bootstrapping
replicas = replicas.filter(replica -> StorageService.instance.isRpcReady(replica.endpoint()));
@@ -1511,7 +1508,7 @@ public class StorageProxy implements StorageProxyMBean
}
private static Runnable counterWriteTask(final IMutation mutation,
- final EndpointsForToken targets,
+ final ReplicaPlan.ForTokenWrite replicaPlan,
final AbstractWriteResponseHandler<IMutation> responseHandler,
final String localDataCenter)
{
@@ -1524,7 +1521,7 @@ public class StorageProxy implements StorageProxyMBean
Mutation result = ((CounterMutation) mutation).applyCounterMutation();
responseHandler.response(null);
- sendToHintedReplicas(result, targets, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
+ sendToHintedReplicas(result, replicaPlan, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
}
};
}
@@ -1592,7 +1589,7 @@ public class StorageProxy implements StorageProxyMBean
try
{
// make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read
- ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
+ ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
// does the work of applying in-progress writes; throws UAE or timeout if it can't
final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
@@ -1601,7 +1598,7 @@ public class StorageProxy implements StorageProxyMBean
try
{
- final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaLayout, consistencyLevel, consistencyForCommitOrFetch, false, state);
+ final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaPlan, consistencyLevel, consistencyForCommitOrFetch, false, state);
if (pair.contentions > 0)
casReadMetrics.contention.update(pair.contentions);
}
@@ -1850,21 +1847,6 @@ public class StorageProxy implements StorageProxyMBean
}
}
- public static EndpointsForToken getLiveSortedReplicasForToken(Keyspace keyspace, RingPosition pos)
- {
- return getLiveSortedReplicas(keyspace, pos).forToken(pos.getToken());
- }
-
- public static EndpointsForRange getLiveSortedReplicas(Keyspace keyspace, RingPosition pos)
- {
- EndpointsForRange liveReplicas = StorageService.instance.getLiveNaturalReplicas(keyspace, pos);
- // Replica availability is considered by the query path
- Preconditions.checkState(liveReplicas.isEmpty() || liveReplicas.stream().anyMatch(Replica::isFull),
- "At least one full replica required for reads: " + liveReplicas);
-
- return DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), liveReplicas);
- }
-
/**
* Estimate the number of result rows per range in the ring based on our local data.
* <p>
@@ -1883,7 +1865,7 @@ public class StorageProxy implements StorageProxyMBean
return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
}
- private static class RangeIterator extends AbstractIterator<ReplicaLayout.ForRange>
+ private static class RangeIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
{
private final Keyspace keyspace;
private final ConsistencyLevel consistency;
@@ -1907,43 +1889,34 @@ public class StorageProxy implements StorageProxyMBean
return rangeCount;
}
- protected ReplicaLayout.ForRange computeNext()
+ protected ReplicaPlan.ForRangeRead computeNext()
{
if (!ranges.hasNext())
return endOfData();
- AbstractBounds<PartitionPosition> range = ranges.next();
- EndpointsForRange liveReplicas = getLiveSortedReplicas(keyspace, range.right);
-
- int blockFor = consistency.blockFor(keyspace);
- EndpointsForRange targetReplicas = consistency.filterForQuery(keyspace, liveReplicas);
- int minResponses = Math.min(targetReplicas.size(), blockFor);
-
- // Endpoints for range here as well
- return ReplicaLayout.forRangeRead(keyspace, consistency, range,
- liveReplicas, targetReplicas.subList(0, minResponses));
+ return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next());
}
}
- private static class RangeMerger extends AbstractIterator<ReplicaLayout.ForRange>
+ private static class RangeMerger extends AbstractIterator<ReplicaPlan.ForRangeRead>
{
private final Keyspace keyspace;
private final ConsistencyLevel consistency;
- private final PeekingIterator<ReplicaLayout.ForRange> ranges;
+ private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges;
- private RangeMerger(Iterator<ReplicaLayout.ForRange> iterator, Keyspace keyspace, ConsistencyLevel consistency)
+ private RangeMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency)
{
this.keyspace = keyspace;
this.consistency = consistency;
this.ranges = Iterators.peekingIterator(iterator);
}
- protected ReplicaLayout.ForRange computeNext()
+ protected ReplicaPlan.ForRangeRead computeNext()
{
if (!ranges.hasNext())
return endOfData();
- ReplicaLayout.ForRange current = ranges.next();
+ ReplicaPlan.ForRangeRead current = ranges.next();
// getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
// the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
@@ -1955,25 +1928,15 @@ public class StorageProxy implements StorageProxyMBean
// Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
// the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
// wire compatibility, so It's likely easier not to bother;
- if (current.range.right.isMinimum())
+ if (current.range().right.isMinimum())
break;
- ReplicaLayout.ForRange next = ranges.peek();
-
- EndpointsForRange merged = current.all().keep(next.all().endpoints());
-
- // Check if there is enough endpoint for the merge to be possible.
- if (!consistency.isSufficientLiveNodesForRead(keyspace, merged))
- break;
-
- EndpointsForRange filteredMerged = consistency.filterForQuery(keyspace, merged);
-
- // Estimate whether merging will be a win or not
- if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, current.selected(), next.selected()))
+ ReplicaPlan.ForRangeRead next = ranges.peek();
+ ReplicaPlan.ForRangeRead merged = ReplicaPlans.maybeMerge(keyspace, consistency, current, next);
+ if (merged == null)
break;
- // If we get there, merge this range and the next one
- current = ReplicaLayout.forRangeRead(keyspace, consistency, current.range.withNewRight(next.range.right), merged, filteredMerged);
+ current = merged;
ranges.next(); // consume the range we just merged since we've only peeked so far
}
return current;
@@ -2018,7 +1981,7 @@ public class StorageProxy implements StorageProxyMBean
private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
{
- private final Iterator<ReplicaLayout.ForRange> ranges;
+ private final Iterator<ReplicaPlan.ForRangeRead> ranges;
private final int totalRangeCount;
private final PartitionRangeReadCommand command;
private final boolean enforceStrictLiveness;
@@ -2108,42 +2071,40 @@ public class StorageProxy implements StorageProxyMBean
/**
* Queries the provided sub-range.
*
- * @param replicaLayout the subRange to query.
+ * @param replicaPlan the subRange to query.
* @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
* that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
* {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
* that it's the query that "continues" whatever we're previously queried).
*/
- private SingleRangeResponse query(ReplicaLayout.ForRange replicaLayout, boolean isFirst)
+ private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst)
{
- PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaLayout.range, isFirst);
- ReadRepair<EndpointsForRange, ReplicaLayout.ForRange> readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
- DataResolver<EndpointsForRange, ReplicaLayout.ForRange> resolver = new DataResolver<>(rangeCommand, replicaLayout, readRepair, queryStartNanoTime);
- Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-
- ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = new ReadCallback<>(resolver,
- replicaLayout.consistencyLevel().blockFor(keyspace),
- rangeCommand,
- replicaLayout,
- queryStartNanoTime);
+ PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst);
+ ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
+ ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
+ = ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime);
+ DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
+ = new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime);
+ ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler
+ = new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime);
- handler.assureSufficientLiveNodes();
+ replicaPlan.assureSufficientReplicas();
// If enabled, request repaired data tracking info from full replicas but
// only if there are multiple full replicas to compare results from
if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()
- && replicaLayout.selected().filter(Replica::isFull).size() > 1)
+ && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
{
command.trackRepairedStatus();
}
- if (replicaLayout.selected().size() == 1 && replicaLayout.selected().get(0).isLocal())
+ if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isLocal())
{
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
}
else
{
- for (Replica replica : replicaLayout.selected())
+ for (Replica replica : replicaPlan.contacts())
{
Tracing.trace("Enqueuing request to {}", replica);
PartitionRangeReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery();
@@ -2486,7 +2447,7 @@ public class StorageProxy implements StorageProxyMBean
public interface WritePerformer
{
public void apply(IMutation mutation,
- ReplicaLayout.ForToken targets,
+ ReplicaPlan.ForTokenWrite targets,
AbstractWriteResponseHandler<IMutation> responseHandler,
String localDataCenter) throws OverloadedException;
}
@@ -2499,7 +2460,7 @@ public class StorageProxy implements StorageProxyMBean
public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime)
{
super(writeHandler, i, cleanup, queryStartNanoTime);
- viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
+ viewWriteMetrics.viewReplicasAttempted.inc(candidateReplicaCount());
}
public void response(MessageIn<IMutation> msg)
@@ -2705,7 +2666,7 @@ public class StorageProxy implements StorageProxyMBean
HintsService.instance.write(hostIds, Hint.create(mutation, System.currentTimeMillis()));
validTargets.forEach(HintsService.instance.metrics::incrCreatedHints);
// Notify the handler only for CL == ANY
- if (responseHandler != null && responseHandler.replicaLayout.consistencyLevel() == ConsistencyLevel.ANY)
+ if (responseHandler != null && responseHandler.replicaPlan.consistencyLevel() == ConsistencyLevel.ANY)
responseHandler.response(null);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7f4ae14..a979f1c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -172,7 +172,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public RangesAtEndpoint getLocalReplicas(String keyspaceName)
{
- return getReplicasForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort());
+ return Keyspace.open(keyspaceName).getReplicationStrategy()
+ .getAddressReplicas(FBUtilities.getBroadcastAddressAndPort());
}
public List<Range<Token>> getLocalAndPendingRanges(String ks)
@@ -2015,11 +2016,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
private EndpointsByRange constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
{
+ AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
Map<Range<Token>, EndpointsForRange> rangeToEndpointMap = new HashMap<>(ranges.size());
for (Range<Token> range : ranges)
- {
- rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalReplicas(range.right));
- }
+ rangeToEndpointMap.put(range, strategy.getNaturalReplicas(range.right));
return new EndpointsByRange(rangeToEndpointMap);
}
@@ -3878,16 +3878,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
/**
- * Get all ranges an endpoint is responsible for (by keyspace)
- * @param ep endpoint we are interested in.
- * @return ranges for the specified endpoint.
- */
- RangesAtEndpoint getReplicasForEndpoint(String keyspaceName, InetAddressAndPort ep)
- {
- return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressReplicas(ep);
- }
-
- /**
* Get all ranges that span the ring given a set
* of tokens. All ranges are in sorted order of
* ranges.
@@ -3936,11 +3926,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, cf, key), true);
}
-
@Deprecated
public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key)
{
- EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key));
+ EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, key);
List<InetAddress> inetList = new ArrayList<>(replicas.size());
replicas.forEach(r -> inetList.add(r.endpoint().address));
return inetList;
@@ -3948,7 +3937,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public List<String> getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer key)
{
- return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key)), true);
+ EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, key);
+ return Replicas.stringify(replicas, true);
}
public List<String> getReplicas(String keyspaceName, String cf, String key)
@@ -3971,61 +3961,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (metadata == null)
throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
- return getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key)));
- }
-
- /**
- * This method returns the N endpoints that are responsible for storing the
- * specified key i.e for replication.
- *
- * @param keyspaceName keyspace name also known as keyspace
- * @param pos position for which we need to find the endpoint
- * @return the endpoint responsible for this token
- */
- public static EndpointsForToken getNaturalReplicasForToken(String keyspaceName, RingPosition pos)
- {
- return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(pos);
+ return getNaturalReplicasForToken(keyspaceName, metadata.partitionKeyType.fromString(key));
}
- /**
- * Returns the endpoints currently responsible for storing the token plus pending ones
- */
- public EndpointsForToken getNaturalAndPendingReplicasForToken(String keyspaceName, Token token)
- {
- // TODO: race condition to fetch these. impliciations??
- EndpointsForToken natural = getNaturalReplicasForToken(keyspaceName, token);
- EndpointsForToken pending = tokenMetadata.pendingEndpointsForToken(token, keyspaceName);
- if (Endpoints.haveConflicts(natural, pending))
- {
- natural = Endpoints.resolveConflictsInNatural(natural, pending);
- pending = Endpoints.resolveConflictsInPending(natural, pending);
- }
- return Endpoints.concat(natural, pending);
- }
-
- /**
- * This method attempts to return N endpoints that are responsible for storing the
- * specified key i.e for replication.
- *
- * @param keyspace keyspace name also known as keyspace
- * @param pos position for which we need to find the endpoint
- */
- public EndpointsForToken getLiveNaturalReplicasForToken(Keyspace keyspace, RingPosition pos)
- {
- return getLiveNaturalReplicas(keyspace, pos).forToken(pos.getToken());
- }
-
- /**
- * This method attempts to return N endpoints that are responsible for storing the
- * specified key i.e for replication.
- *
- * @param keyspace keyspace name also known as keyspace
- * @param pos position for which we need to find the endpoint
- */
- public EndpointsForRange getLiveNaturalReplicas(Keyspace keyspace, RingPosition pos)
+ public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, ByteBuffer key)
{
- EndpointsForRange replicas = keyspace.getReplicationStrategy().getNaturalReplicas(pos);
- return replicas.filter(r -> FailureDetector.instance.isAlive(r.endpoint()));
+ Token token = tokenMetadata.partitioner.getToken(key);
+ return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(token);
}
public void setLoggingLevel(String classQualifier, String rawLevel) throws Exception
@@ -4268,7 +4210,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
.filter(endpoint -> FailureDetector.instance.isAlive(endpoint) && !FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
.collect(Collectors.toList());
- return EndpointsForRange.copyOf(SystemReplicas.getSystemReplicas(endpoints));
+ return SystemReplicas.getSystemReplicas(endpoints);
}
/**
* Find the best target to stream hints to. Currently the closest peer according to the snitch
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index a07aae6..f9bfedf 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -19,9 +19,7 @@ package org.apache.cassandra.service;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.ReplicaLayout;
-
+import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,18 +37,18 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater
= AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses");
- public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+ public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
- super(replicaLayout, callback, writeType, queryStartNanoTime);
- responses = totalBlockFor();
+ super(replicaPlan, callback, writeType, queryStartNanoTime);
+ responses = blockFor();
}
- public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout, WriteType writeType, long queryStartNanoTime)
+ public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, WriteType writeType, long queryStartNanoTime)
{
- this(replicaLayout, null, writeType, queryStartNanoTime);
+ this(replicaPlan, null, writeType, queryStartNanoTime);
}
public void response(MessageIn<T> m)
@@ -65,7 +63,7 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
protected int ackCount()
{
- return totalBlockFor() - responses;
+ return blockFor() - responses;
}
public boolean isLatencyForSnitch()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org